Fork/Join模式(JSR166y)手記之TransferQueue/LinkedTransferQueue
TransferQueue是一個繼承了 BlockingQueue的接口,并且增加若干新的方法。LinkedTransferQueue是實現類,其定義為一個無界的隊列,一樣具有先進先出(FIFO : first-in-first-out)的特性。
Doug Lea 這樣評價它:
TransferQueue是一個聰明的隊列,它是ConcurrentLinkedQueue, SynchronousQueue (在公平模式下), 無界的LinkedBlockingQueues等的超集。顯然易見,混合了若干高級特性,并且具有高性能的一個組合體,一個多面手。
這里有一個有關LinkedTransferQueue的Doug Lea等人所撰寫論文,討論了其算法、性能等,地址:http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf
單純從隊列來看,TransferQueue接口增加了一些很實用的新特性,其transfer方法提供了線程之間直接交換對象的捷徑,下面一一說來。
- transfer(E e)
若當前存在一個正在等待獲取的消費者線程,即立刻移交之;否則,會插入當前元素e到隊列尾部,并且等待進入阻塞狀態,到有消費者線程取走該元素。 - tryTransfer(E e)
若當前存在一個正在等待獲取的消費者線程(使用take()或者poll()函數),使用該方法會即刻轉移/傳輸對象元素e;
若不存在,則返回false,并且不進入隊列。
這是一個不阻塞的操作。 - tryTransfer(E e, long timeout, TimeUnit unit)
若當前存在一個正在等待獲取的消費者線程,會立即傳輸給它;
否則將插入元素e到隊列尾部,并且等待被消費者線程獲取消費掉,
若在指定的時間內元素e無法被消費者線程獲取,則返回false,同時該元素被移除。 - hasWaitingConsumer()
很明顯,判斷是否終端消費者線程 - getWaitingConsumerCount()
字面意思很明白,獲取終端所有等待獲取元素的消費線程數量 - size()
因為隊列的異步特性,檢測當前隊列的元素個數需要逐一迭代,可能會得到一個不太準確的結果,尤其是在遍歷時有可能隊列發生更改。 - 批量操作
類似于 addAll,removeAll, retainAll, containsAll, equals, toArray 等方法,API不能保證一定會立刻執行。
因此,我們在使用過程中,不能有所期待,這是一個具有異步特性的隊列。
注意事項:
- 無論是transfer還是tryTransfer方法,在>=1個消費者線程等待獲取元素時(此時隊列為空),都會立刻轉交,這屬于線程之間的元素交換。注意,這時,元素并沒有進入隊列。
- 在隊列中已有數據情況下,transfer將需要等待前面數據被消費掉,直到傳遞的元素e被消費線程取走為止。
- 使用transfer方法,工作者線程可能會被阻塞到生產的元素被消費掉為止
- 消費者線程等待為零的情況下,各自的處理元素入隊與否情況有所不同。
- size()方法,需要迭代,可能不太準確,盡量不要調用。
或許,下次我們在構造一個線程池時,可以考慮使用TransferQueue:
public static ExecutorService newTransferCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new LinkedTransferQueue());
}
簡單測試代碼:
posted on 2012-02-04 11:28 nieyong 閱讀(2656) 評論(0) 編輯 收藏 所屬分類: Java