Fork/Join模式(JSR166y)手記之Fork/Join模式實現淺析1
對Fork/Join的個人理解要點:
- fork/join將大的任務分割小的任務,直到小的任務可以使用最簡單、直接或者同步的方式處理。
- 最小的任務將無法分解
- 每一個任務不是線程的實例
- 每一個工作線程將是一個隱式的線程實例
- 每一個工作線程都會維護自身的一個雙向隊列(支持FIFO/LIFO);在任務產生的子任務,會被push進當前工作線程所維護deque隊列中,進入隊列頭部。
- 當一個工作線程的雙向隊列中暫無任務時,它會從隨機的工作線程的雙向隊列的尾部獲取一個入隊最久的子任務(稱之為竊取),take()方式獲取,先進先出的規則(FIFO)。
- 當一個工作線程遇到一個join的操作,假如可能的話,它會處理其他的任務,直到目標任務被通知需要處理掉。
- 當一個工作者線程沒有任務可以處理,并且不能從其他工作者線程中竊取的時,它會后退(通過yields,sleeps,或者優先級的調整),稍后重試,直到所有工作線程都會處于空閑狀態,所有線程都會阻塞,等到另外的任務在頂層被調用。
Brian Goetz 認為"使用傳統的線程池來實現 fork-join 也具有挑戰性,因為 fork-join任務將線程生命周期的大部分時間花費在等待其他任務上。這種行為會造成線程饑餓死鎖(thread starvation deadlock),除非小心選擇參數以限制創建的任務數量,或者池本身非常大。傳統的線程池是為相互獨立的任務設計的,而且設計中也考慮了潛在的阻塞、粗粒度任務 — fork-join 解決方案不會產生這兩種情況。對于傳統線程池的細粒度任務,也存在所有工作線程共享的任務隊列發生爭用的情況。"
下面談一談工作竊取(work stealing)。在Fork/Join中,工作竊取采用了一個被當做棧(Stack)使用的雙端隊列WorkQueue。雙端隊列WorkQueue,支持在兩端插入和移除元素,和單獨的隊列(Queue)相比,多了一端。在Fork/Join中工作線程中被當做棧(Stack)來使用,在頭部push插入數據,pop獲取數。而尾部,可以供需要竊取的工作線程(take()方法)使用。與單向隊列相比,減少爭用,可以提高性能。

static final class WorkQueue {它既沒有繼承自Deque又沒有繼承Queue接口,而是自己獨立寫了一個雙端隊列,數組實現。很顯然,數組的讀取性能要強于鏈表。
......
}
看一下ForkJoinPool的默認構造函數:
public ForkJoinPool() {在ForkJoinPool代碼初始化時,默認情況下:
this(Runtime.getRuntime().availableProcessors(),
defaultForkJoinWorkerThreadFactory, null, false);
}
public static interface ForkJoinWorkerThreadFactory {
public ForkJoinWorkerThread newThread(ForkJoinPool pool);
}
static class DefaultForkJoinWorkerThreadFactory
implements ForkJoinWorkerThreadFactory {
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return new ForkJoinWorkerThread(pool);
}
}
defaultForkJoinWorkerThreadFactory =默認情況下,根據當前CPU的數量建立一個ForkJoinWorkerThreadFactory工廠,CPU數量個ForkJoinWorkerThread工作線程。
new DefaultForkJoinWorkerThreadFactory();
仔細看一下ForkJoinWorkerThread代碼,工作線程繼承自Thread:
很顯然,onStart 和 onTermination為鉤子函數,可以被重寫,但,需要構造一個新的ForkJoinPool.ForkJoinWorkerThreadFactory來配合使用。比如:
public static void main(String[] args) {
ForkJoinPool.ForkJoinWorkerThreadFactory factory = new ForkJoinPool.ForkJoinWorkerThreadFactory() {
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return new CustomedForkJoinWorkerThread(pool);
}
};
ForkJoinPool joinPool = new ForkJoinPool(Runtime.getRuntime()
.availableProcessors(), factory, null, false);
// some code here ...
}
private static final class CustomedForkJoinWorkerThread extends
ForkJoinWorkerThread {
protected CustomedForkJoinWorkerThread(ForkJoinPool pool) {
super(pool);
}
@Override
protected void onStart() {
super.onStart();
System.out.println("準備初始化資源...");
}
@Override
protected void onTermination(Throwable exception) {
System.out.println("開始清理資源...");
super.onTermination(exception);
}
}
接著看一下pool.runWorker(this)方法:
final void runWorker(ForkJoinWorkerThread wt) {
WorkQueue w = wt.workQueue;
w.growArray(false);
w.seed = hashId(Thread.currentThread().getId());
do {} while (w.runTask(scan(w)));
}
初始化隊列,設置其seed為當前線程ID的哈希值。然后循環執行,當沒有任務可獲取,自然就退出了。而scan()很復雜,大概功能,從當前隊列中獲取元素,當前隊列為空時,從其他工作線程所持有的隊列中竊取一個。都沒有時,只能返回null,進而阻止線程活動。
嗯,有時間會再深入WorkQueue隊列一些。
posted on 2012-02-08 21:35 nieyong 閱讀(1778) 評論(0) 編輯 收藏 所屬分類: Java