今天就聊聊這兩種Queue,本文分為以下兩個(gè)部分,用分割線分開:
首先來看看BlockingQueue:
Queue是什么就不需要多說了吧,一句話:隊(duì)列是先進(jìn)先出。相對的,棧是后進(jìn)先出。如果不熟悉的話先找本基礎(chǔ)的數(shù)據(jù)結(jié)構(gòu)的書看看吧。
BlockingQueue,顧名思義,“阻塞隊(duì)列”:可以提供阻塞功能的隊(duì)列。
首先,看看BlockingQueue提供的常用方法:
從上表可以很明顯看出每個(gè)方法的作用,這個(gè)不用多說。我想說的是:
好,上點(diǎn)源碼你就更明白了。以ArrayBlockingQueue類為例:
對于第一類方法,很明顯如果操作不成功就拋異常。而且可以看到其實(shí)調(diào)用的是第二類的方法,為什么?因?yàn)榈诙惙椒ǚ祷豣oolean啊。
- BlockingQueue
- ConcurrentLinkedQueue,非阻塞算法
首先來看看BlockingQueue:
Queue是什么就不需要多說了吧,一句話:隊(duì)列是先進(jìn)先出。相對的,棧是后進(jìn)先出。如果不熟悉的話先找本基礎(chǔ)的數(shù)據(jù)結(jié)構(gòu)的書看看吧。
BlockingQueue,顧名思義,“阻塞隊(duì)列”:可以提供阻塞功能的隊(duì)列。
首先,看看BlockingQueue提供的常用方法:
Throws exception | Special value | Blocks | Times out | |
Insert | add(e) | offer(e) | put(e) | offer(e, timeout, unit) |
Remove | remove() | poll() | take() | poll(timeout, unit) |
Examine | element() | peek() | not applicable | not applicable |
從上表可以很明顯看出每個(gè)方法的作用,這個(gè)不用多說。我想說的是:
- add(e) remove() element() 方法不會(huì)阻塞線程。當(dāng)不滿足約束條件時(shí),會(huì)拋出IllegalStateException 異常。例如:當(dāng)隊(duì)列被元素填滿后,再調(diào)用add(e),則會(huì)拋出異常。
- offer(e) poll() peek() 方法即不會(huì)阻塞線程,也不會(huì)拋出異常。例如:當(dāng)隊(duì)列被元素填滿后,再調(diào)用offer(e),則不會(huì)插入元素,函數(shù)返回false。
- 要想要實(shí)現(xiàn)阻塞功能,需要調(diào)用put(e) take() 方法。當(dāng)不滿足約束條件時(shí),會(huì)阻塞線程。
好,上點(diǎn)源碼你就更明白了。以ArrayBlockingQueue類為例:
對于第一類方法,很明顯如果操作不成功就拋異常。而且可以看到其實(shí)調(diào)用的是第二類的方法,為什么?因?yàn)榈诙惙椒ǚ祷豣oolean啊。
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");//隊(duì)列已滿,拋異常
}
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();//隊(duì)列為空,拋異常
}
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");//隊(duì)列已滿,拋異常
}
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();//隊(duì)列為空,拋異常
}
注:先不看阻塞與否,這ReentrantLock的使用方式就能說明這個(gè)類是線程安全類。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)//隊(duì)列已滿,返回false
return false;
else {
insert(e);//insert方法中發(fā)出了notEmpty.signal();
return true;
}
} finally {
lock.unlock();
}
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == 0)//隊(duì)列為空,返回false
return null;
E x = extract();//extract方法中發(fā)出了notFull.signal();
return x;
} finally {
lock.unlock();
}
}
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)//隊(duì)列已滿,返回false
return false;
else {
insert(e);//insert方法中發(fā)出了notEmpty.signal();
return true;
}
} finally {
lock.unlock();
}
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == 0)//隊(duì)列為空,返回false
return null;
E x = extract();//extract方法中發(fā)出了notFull.signal();
return x;
} finally {
lock.unlock();
}
}
對于第三類方法,這里面涉及到Condition類,簡要提一下,
await方法指:造成當(dāng)前線程在接到信號(hào)或被中斷之前一直處于等待狀態(tài)。
signal方法指:喚醒一個(gè)等待線程。
await方法指:造成當(dāng)前線程在接到信號(hào)或被中斷之前一直處于等待狀態(tài)。
signal方法指:喚醒一個(gè)等待線程。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final E[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {flex對應(yīng)java數(shù)據(jù)類型
try {
while (count == items.length)//如果隊(duì)列已滿,等待notFull這個(gè)條件,這時(shí)當(dāng)前線程被阻塞
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); //喚醒受notFull阻塞的當(dāng)前線程
throw ie;
}
insert(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == 0)//如果隊(duì)列為空,等待notEmpty這個(gè)條件,這時(shí)當(dāng)前線程被阻塞
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal();//喚醒受notEmpty阻塞的當(dāng)前線程
throw ie;
}
E x = extract();
return x;
} finally {
lock.unlock();
}
}
if (e == null) throw new NullPointerException();
final E[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {flex對應(yīng)java數(shù)據(jù)類型
try {
while (count == items.length)//如果隊(duì)列已滿,等待notFull這個(gè)條件,這時(shí)當(dāng)前線程被阻塞
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); //喚醒受notFull阻塞的當(dāng)前線程
throw ie;
}
insert(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == 0)//如果隊(duì)列為空,等待notEmpty這個(gè)條件,這時(shí)當(dāng)前線程被阻塞
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal();//喚醒受notEmpty阻塞的當(dāng)前線程
throw ie;
}
E x = extract();
return x;
} finally {
lock.unlock();
}
}
第四類方法就是指在有必要時(shí)等待指定時(shí)間,就不詳細(xì)說了。
再來看看BlockingQueue接口的具體實(shí)現(xiàn)類吧:
首先,既然是鏈表,就應(yīng)該有Node節(jié)點(diǎn),它是一個(gè)內(nèi)部靜態(tài)類:
再來看看BlockingQueue接口的具體實(shí)現(xiàn)類吧:
- ArrayBlockingQueue,其構(gòu)造函數(shù)必須帶一個(gè)int參數(shù)來指明其大小
- LinkedBlockingQueue,若其構(gòu)造函數(shù)帶一個(gè)規(guī)定大小的參數(shù),生成的BlockingQueue有大小限制,若不帶大小參數(shù),所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定
- PriorityBlockingQueue,其所含對象的排序不是FIFO,而是依據(jù)對象的自然排序順序或者是構(gòu)造函數(shù)的Comparator決定的順序
首先,既然是鏈表,就應(yīng)該有Node節(jié)點(diǎn),它是一個(gè)內(nèi)部靜態(tài)類:
static class Node<E> {
/** The item, volatile to ensure barrier separating write and read */
volatile E item;
Node<E> next;
Node(E x) { item = x; }
}
/** The item, volatile to ensure barrier separating write and read */
volatile E item;
Node<E> next;
Node(E x) { item = x; }
}
然后,對于鏈表來說,肯定需要兩個(gè)變量來標(biāo)示頭和尾:
/** 頭指針 */
private transient Node<E> head; //head.next是隊(duì)列的頭元素
/** 尾指針 */
private transient Node<E> last; //last.next是null
private transient Node<E> head; //head.next是隊(duì)列的頭元素
/** 尾指針 */
private transient Node<E> last; //last.next是null
么,對于入隊(duì)和出隊(duì)就很自然能理解了:
private void enqueue(E x) {
last = last.next = new Node<E>(x); //入隊(duì)是為last再找個(gè)下家
}
private E dequeue() {
Node<E> first = head.next; //出隊(duì)是把head.next取出來,然后將head向后移一位
head = first;
E x = first.item;
first.item = null;
return x;
}
last = last.next = new Node<E>(x); //入隊(duì)是為last再找個(gè)下家
}
private E dequeue() {
Node<E> first = head.next; //出隊(duì)是把head.next取出來,然后將head向后移一位
head = first;
E x = first.item;
first.item = null;
return x;
}
另外,LinkedBlockingQueue相對于ArrayBlockingQueue還有不同是,有兩個(gè)ReentrantLock,且隊(duì)列現(xiàn)有元素的大小由一個(gè)AtomicInteger對象標(biāo)示。
注:AtomicInteger類是以原子的方式操作整型變量。
注:AtomicInteger類是以原子的方式操作整型變量。
private final AtomicInteger count = new AtomicInteger(0);
/** 用于讀取的獨(dú)占鎖*/
private final ReentrantLock takeLock = new ReentrantLock();
/** 隊(duì)列是否為空的條件 */
private final Condition notEmpty = takeLock.newCondition();
/** 用于寫入的獨(dú)占鎖 */
private final ReentrantLock putLock = new ReentrantLock();
/** 隊(duì)列是否已滿的條件 */
private final Condition notFull = putLock.newCondition();
/** 用于讀取的獨(dú)占鎖*/
private final ReentrantLock takeLock = new ReentrantLock();
/** 隊(duì)列是否為空的條件 */
private final Condition notEmpty = takeLock.newCondition();
/** 用于寫入的獨(dú)占鎖 */
private final ReentrantLock putLock = new ReentrantLock();
/** 隊(duì)列是否已滿的條件 */
private final Condition notFull = putLock.newCondition();
有兩個(gè)Condition很好理解,在ArrayBlockingQueue也是這樣做的。但是為什么需要兩個(gè)ReentrantLock呢?下面會(huì)慢慢道來。
讓我們來看看offer和poll方法的代碼:
讓我們來看看offer和poll方法的代碼:
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
final ReentrantLock putLock = this.putLock;//入隊(duì)當(dāng)然用putLock
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(e); //入隊(duì)
c = count.getAndIncrement(); //隊(duì)長度+1
if (c + 1 < capacity)
notFull.signal(); //隊(duì)列沒滿,當(dāng)然可以解鎖了
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();//這個(gè)方法里發(fā)出了notEmpty.signal();
return c >= 0;
}
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;出隊(duì)當(dāng)然用takeLock
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();//出隊(duì)
c = count.getAndDecrement();//隊(duì)長度-1
if (c > 1)
notEmpty.signal();//隊(duì)列沒空,解鎖
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();//這個(gè)方法里發(fā)出了notFull.signal();
return x;
}
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
final ReentrantLock putLock = this.putLock;//入隊(duì)當(dāng)然用putLock
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(e); //入隊(duì)
c = count.getAndIncrement(); //隊(duì)長度+1
if (c + 1 < capacity)
notFull.signal(); //隊(duì)列沒滿,當(dāng)然可以解鎖了
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();//這個(gè)方法里發(fā)出了notEmpty.signal();
return c >= 0;
}
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;出隊(duì)當(dāng)然用takeLock
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();//出隊(duì)
c = count.getAndDecrement();//隊(duì)長度-1
if (c > 1)
notEmpty.signal();//隊(duì)列沒空,解鎖
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();//這個(gè)方法里發(fā)出了notFull.signal();
return x;
}
看看源代碼發(fā)現(xiàn)和上面ArrayBlockingQueue的很類似,關(guān)鍵的問題在于:為什么要用兩個(gè)ReentrantLockputLock和takeLock?
我們仔細(xì)想一下,入隊(duì)操作其實(shí)操作的只有隊(duì)尾引用last,并且沒有牽涉到head。而出隊(duì)操作其實(shí)只針對head,和last沒有關(guān)系。那么就 是說入隊(duì)和出隊(duì)的操作完全不需要公用一把鎖,所以就設(shè)計(jì)了兩個(gè)鎖,這樣就實(shí)現(xiàn)了多個(gè)不同任務(wù)的線程入隊(duì)的同時(shí)可以進(jìn)行出隊(duì)的操作,另一方面由于兩個(gè)操作所 共同使用的count是AtomicInteger類型的,所以完全不用考慮計(jì)數(shù)器遞增遞減的問題。
另外,還有一點(diǎn)需要說明一下:await()和singal()這兩個(gè)方法執(zhí)行時(shí)都會(huì)檢查當(dāng)前線程是否是獨(dú)占鎖的當(dāng)前線程,如果不是則拋出 java.lang.IllegalMonitorStateException異常。所以可以看到在源碼中這兩個(gè)方法都出現(xiàn)在Lock的保護(hù)塊中。
我們仔細(xì)想一下,入隊(duì)操作其實(shí)操作的只有隊(duì)尾引用last,并且沒有牽涉到head。而出隊(duì)操作其實(shí)只針對head,和last沒有關(guān)系。那么就 是說入隊(duì)和出隊(duì)的操作完全不需要公用一把鎖,所以就設(shè)計(jì)了兩個(gè)鎖,這樣就實(shí)現(xiàn)了多個(gè)不同任務(wù)的線程入隊(duì)的同時(shí)可以進(jìn)行出隊(duì)的操作,另一方面由于兩個(gè)操作所 共同使用的count是AtomicInteger類型的,所以完全不用考慮計(jì)數(shù)器遞增遞減的問題。
另外,還有一點(diǎn)需要說明一下:await()和singal()這兩個(gè)方法執(zhí)行時(shí)都會(huì)檢查當(dāng)前線程是否是獨(dú)占鎖的當(dāng)前線程,如果不是則拋出 java.lang.IllegalMonitorStateException異常。所以可以看到在源碼中這兩個(gè)方法都出現(xiàn)在Lock的保護(hù)塊中。