在Set中有一個(gè)排序的集合SortedSet,用來(lái)保存按照自然順序排列的對(duì)象。Queue中同樣引入了一個(gè)支持排序的FIFO模型。
并發(fā)隊(duì)列與Queue簡(jiǎn)介 中介紹了,PriorityQueue和PriorityBlockingQueue就是支持排序的Queue。顯然一個(gè)支持阻塞的排序Queue要比一 個(gè)非線程安全的Queue實(shí)現(xiàn)起來(lái)要復(fù)雜的多,因此下面只介紹PriorityBlockingQueue,至于PriorityQueue只需要去掉 Blocking功能就基本相同了。
轉(zhuǎn)自: http://www.aygfsteel.com/xylz/archive/2010/07/30/327582.htm
排序的BlockingQueue — PriorityBlockingQueue
先簡(jiǎn)單介紹下PriorityQueue,因?yàn)镻riorityBlockingQueue內(nèi)部就是通過(guò)PriorityQueue適配實(shí)現(xiàn)的,只不過(guò)通過(guò)鎖進(jìn)行同步和阻塞而已。
PriorityQueue是一個(gè)數(shù)組實(shí)現(xiàn)的,是一個(gè)二叉樹的實(shí)現(xiàn),這個(gè)二叉樹的任意一個(gè)節(jié)點(diǎn)都比其子節(jié)點(diǎn)要小,這樣頂點(diǎn)就是最小的節(jié)點(diǎn)。每一個(gè)元 素或者節(jié)點(diǎn)要么本身是可比較的(Comparable),或者隊(duì)列本身帶有一個(gè)比較器(Comparator<? super E>),所有元素就是靠比較自身的大小來(lái)確定順序的。而數(shù)組中頂點(diǎn)就是數(shù)組的第0個(gè)元素,因此出隊(duì)列的話總是取第0個(gè)元素。對(duì)于第0個(gè)元素,其子節(jié) 點(diǎn)是第1個(gè)元素和第2個(gè)元素,對(duì)于第1個(gè)元素,其子元素又是第3/4個(gè)元素,以此類推,第i個(gè)元素的父節(jié)點(diǎn)就是(i-1)/2。這樣任意一個(gè)元素加入隊(duì)列 就從其父節(jié)點(diǎn)(i-1)/2開始比較,一旦新節(jié)點(diǎn)比父節(jié)點(diǎn)小就交換兩個(gè)節(jié)點(diǎn),然后繼續(xù)比較新節(jié)點(diǎn)與其新的父節(jié)點(diǎn)。知道所有節(jié)點(diǎn)都是按照父節(jié)點(diǎn)一定比子節(jié)點(diǎn) 小的順序排列。這是一個(gè)有點(diǎn)復(fù)雜的算法,此處不再討論更多的細(xì)節(jié)。不管是刪除還是查找,我們只需要了解的頂點(diǎn)(索引為0的元素)總是最小的。
特別需要說(shuō)明的是PriorityQueue是一個(gè)無(wú)界的隊(duì)列,也就是說(shuō)一旦元素的個(gè)數(shù)達(dá)到了數(shù)組的大小,那么就將數(shù)組擴(kuò)大50%,這樣這個(gè)數(shù)組就是無(wú)窮大的。當(dāng)然了如果達(dá)到了整數(shù)的最大值就會(huì)得到一個(gè)OutOfMemoryError,這個(gè)是由邏輯保證的。
對(duì)于PriorityBlockingQueue而言,由于是無(wú)界的,因此就只有非空的信號(hào),也就是說(shuō)只有take()才能阻塞,put是永遠(yuǎn)不會(huì)阻塞(除非達(dá)到Integer.MAX_VALUE直到拋出一個(gè)OutOfMemoryError異常)。
只有take()操作的時(shí)候才可能因?yàn)殛?duì)列為空而掛起。同時(shí)其它需要操作隊(duì)列變化和大小的只需要使用獨(dú)占鎖ReentrantLock就可以了,非常方便。需要說(shuō)明的是PriorityBlockingQueue采用了一個(gè)公平的鎖。
總的來(lái)說(shuō)PriorityBlockingQueue 不是一個(gè)FIFO的隊(duì)列,而是一個(gè)有序的隊(duì)列,這個(gè)隊(duì)列總是取“自然順序”最小的對(duì)象,同時(shí)又是一個(gè)只能出隊(duì)列阻塞的BlockingQueue,對(duì)于入隊(duì)列卻不是阻塞的。所有操作都是線程安全的。
直接交換的BlockingQueue — SynchronousQueue
這是一個(gè)很有意思的阻塞隊(duì)列,其中每個(gè)插入操作必須等待另一個(gè)線程的移除操作,同樣任何一個(gè)移除操作都等待另一個(gè)線程的插入操作。因此此隊(duì)列內(nèi)部其 實(shí)沒有任何一個(gè)元素,或者說(shuō)容量是0,嚴(yán)格說(shuō)并不是一種容器。由于隊(duì)列沒有容量,因此不能調(diào)用peek操作,因?yàn)橹挥幸瞥貢r(shí)才有元素。
一個(gè)沒有容量的并發(fā)隊(duì)列有什么用了?或者說(shuō)存在的意義是什么?
SynchronousQueue 的實(shí)現(xiàn)非常復(fù)雜,當(dāng)然了如果真要去分析還是能夠得到一些經(jīng)驗(yàn)的,但是前面分析了過(guò)多的結(jié)構(gòu)后,發(fā)現(xiàn)越來(lái)越陷于數(shù)據(jù)結(jié)構(gòu)與算法里面了。我的初衷是通過(guò)研究并 發(fā)實(shí)現(xiàn)的原理來(lái)更好的利用并發(fā)來(lái)最大限度的利用可用資源。所以在后面的章節(jié)中盡可能的少研究數(shù)據(jù)結(jié)構(gòu)和算法,但是為了弄清楚里面的原理,必不可免的會(huì)涉及 到一些這方面的知識(shí),希望后面能夠適可而止。
再回到話題。SynchronousQueue 內(nèi)部沒有容量,但是由于一個(gè)插入操作總是對(duì)應(yīng)一個(gè)移除操作,反過(guò)來(lái)同樣需要滿足。那么一個(gè)元素就不會(huì)再SynchronousQueue 里面長(zhǎng)時(shí)間停留,一旦有了插入線程和移除線程,元素很快就從插入線程移交給移除線程。也就是說(shuō)這更像是一種信道(管道),資源從一個(gè)方向快速傳遞到另一方 向。
需要特別說(shuō)明的是,盡管元素在SynchronousQueue 內(nèi)部不會(huì)“停留”,但是并不意味之SynchronousQueue 內(nèi)部沒有隊(duì)列。實(shí)際上SynchronousQueue 維護(hù)者線程隊(duì)列,也就是插入線程或者移除線程在不同時(shí)存在的時(shí)候就會(huì)有線程隊(duì)列。既然有隊(duì)列,同樣就有公平性和非公平性特性,公平性保證正在等待的插入線 程或者移除線程以FIFO的順序傳遞資源。
顯然這是一種快速傳遞元素的方式,也就是說(shuō)在這種情況下元素總是以最快的方式從插入著(生產(chǎn)者)傳遞給移除著(消費(fèi)者),這在多任務(wù)隊(duì)列中是最快處理任務(wù)的方式。在線程池的相關(guān)章節(jié)中還會(huì)更多的提到此特性。
事實(shí)上在《并發(fā)隊(duì)列與Queue簡(jiǎn)介》 中介紹了還有一種BlockingQueue的實(shí)現(xiàn)DelayQueue,它描述的是一種延時(shí)隊(duì)列。這個(gè)隊(duì)列的特性是,隊(duì)列中的元素都要延遲時(shí)間(超時(shí)時(shí) 間),只有一個(gè)元素達(dá)到了延時(shí)時(shí)間才能出隊(duì)列,也就是說(shuō)每次從隊(duì)列中獲取的元素總是最先到達(dá)延時(shí)的元素。這種隊(duì)列的場(chǎng)景就是計(jì)劃任務(wù)。比如以前要完成計(jì)劃 任務(wù),很有可能是使用Timer/TimerTask,這是一種循環(huán)檢測(cè)的方式,也就是在循環(huán)里面遍歷所有元素總是檢測(cè)元素是否滿足條件,一旦滿足條件就 執(zhí)行相關(guān)任務(wù)。顯然這中方式浪費(fèi)了很多的檢測(cè)工作,因?yàn)榇蠖鄶?shù)時(shí)間總是在進(jìn)行無(wú)謂的檢測(cè)。而DelayQueue 卻能避免這種無(wú)謂的檢測(cè)。在線程池的計(jì)劃任務(wù)部分還有更加詳細(xì)的討論此隊(duì)列實(shí)現(xiàn)。
下面就對(duì)常見的BlockingQueue進(jìn)行小節(jié)下,這里不包括雙向的隊(duì)列,盡管ConcurrentLinkedQueue不是可阻塞的Queue,但是這里還是將其放在一起進(jìn)行對(duì)比。
如果不需要阻塞隊(duì)列,優(yōu)先選擇ConcurrentLinkedQueue;如果需要阻塞隊(duì)列,隊(duì)列大小固定優(yōu)先選擇 ArrayBlockingQueue,隊(duì)列大小不固定優(yōu)先選擇LinkedBlockingQueue;如果需要對(duì)隊(duì)列進(jìn)行排序,選擇 PriorityBlockingQueue;如果需要一個(gè)快速交換的隊(duì)列,選擇SynchronousQueue;如果需要對(duì)隊(duì)列中的元素進(jìn)行延時(shí)操 作,則選擇DelayQueue。