CompletionService
接口的實例可以充當生產者和消費者的中間處理引擎,從而達到將提交任務和處理結果的代碼進行解耦的目的。生產者調用 submit
方法提交任務,而消費者調用 poll
(非阻塞)或 take
(阻塞)方法獲取下一個結果:這一特征看起來和阻塞隊列(BlockingQueue
)類似,兩者的區別在于 CompletionService
要負責任務的處理,而阻塞隊列則不會。
在 JDK 中,該接口只有一個實現類 ExecutorCompletionService
,該類使用創建時提供的 Executor
對象(通常是線程池)來執行任務,然后將結果放入一個阻塞隊列中:果然本就是一家親啊!ExecutorCompletionService
將線程池和阻塞隊列糅合在一起,僅僅通過三個方法,就實現了任務的異步處理,可謂并發編程初學者的神兵利器!
接下來看一個例子。樓主有一大堆 *.java 文件,需要計算它們的代碼總行數。利用 ExecutorCompletionService
可以寫出很簡單的多線程處理代碼:
public int countLines(List<Path> javaFiles) throws Exception { // 根據處理器數量創建線程池。雖然多線程并不保證能夠提升性能,但適量地 // 開線程一般可以從系統騙取更多資源。 ExecutorService es = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() * 2); // 使用 ExecutorCompletionService 內建的阻塞隊列。 CompletionService cs = new ExecutorCompletionService(es); // 按文件向 CompletionService 提交任務。 for (final Path javaFile : javaFiles) { cs.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { // 略去計算單個文件行數的代碼。 return countLines(javaFile); } }); } try { int loc = 0; int size = javaFiles.size(); for (int i = 0; i < size; i++) { // take 方法等待下一個結果并返回 Future 對象。不直接返回計算結果是為了 // 捕獲計算時可能拋出的異常。 // poll 不等待,有結果就返回一個 Future 對象,否則返回 null。 loc += cs.take().get(); } return loc; } finally { // 關閉線程池。也可以將線程池提升為字段以便重用。 // 如果任務線程(Callable#call)能響應中斷,用 shutdownNow 更好。 es.shutdown(); } }
最后,CompletionService
也不是到處都能用,它不適合處理任務數量有限但個數不可知的場景。例如,要統計某個文件夾中的文件個數,在遍歷子文件夾的時候也會“遞歸地”提交新的任務,但最后到底提交了多少,以及在什么時候提交完了所有任務,都是未知數,無論 CompletionService
還是線程池都無法進行判斷。這種情況只能直接用線程池來處理。