神奇好望角 The Magical Cape of Good Hope

          庸人不必自擾,智者何需千慮?
          posts - 26, comments - 50, trackbacks - 0, articles - 11
            BlogJava :: 首頁 ::  :: 聯(lián)系 :: 聚合  :: 管理

          非主流并發(fā)工具之 CompletionService

          Posted on 2011-09-29 13:37 蜀山兆孨龘 閱讀(2080) 評論(0)  編輯  收藏 所屬分類: Java SE

          CompletionService 接口的實例可以充當(dāng)生產(chǎn)者和消費者的中間處理引擎,從而達到將提交任務(wù)和處理結(jié)果的代碼進行解耦的目的。生產(chǎn)者調(diào)用 submit 方法提交任務(wù),而消費者調(diào)用 poll(非阻塞)或 take(阻塞)方法獲取下一個結(jié)果:這一特征看起來和阻塞隊列(BlockingQueue)類似,兩者的區(qū)別在于 CompletionService 要負(fù)責(zé)任務(wù)的處理,而阻塞隊列則不會。

          在 JDK 中,該接口只有一個實現(xiàn)類 ExecutorCompletionService,該類使用創(chuàng)建時提供的 Executor 對象(通常是線程池)來執(zhí)行任務(wù),然后將結(jié)果放入一個阻塞隊列中:果然本就是一家親啊!ExecutorCompletionService 將線程池和阻塞隊列糅合在一起,僅僅通過三個方法,就實現(xiàn)了任務(wù)的異步處理,可謂并發(fā)編程初學(xué)者的神兵利器!

          接下來看一個例子。樓主有一大堆 *.java 文件,需要計算它們的代碼總行數(shù)。利用 ExecutorCompletionService 可以寫出很簡單的多線程處理代碼:

                  public int countLines(List<Path> javaFiles) throws Exception {
                      // 根據(jù)處理器數(shù)量創(chuàng)建線程池。雖然多線程并不保證能夠提升性能,但適量地
                      // 開線程一般可以從系統(tǒng)騙取更多資源。
                      ExecutorService es = Executors.newFixedThreadPool(
                              Runtime.getRuntime().availableProcessors() * 2);
                      // 使用 ExecutorCompletionService 內(nèi)建的阻塞隊列。
                      CompletionService cs = new ExecutorCompletionService(es);
          
                      // 按文件向 CompletionService 提交任務(wù)。
                      for (final Path javaFile : javaFiles) {
                          cs.submit(new Callable<Integer>() {
                              @Override
                              public Integer call() throws Exception {
                                  // 略去計算單個文件行數(shù)的代碼。
                                  return countLines(javaFile);
                              }
                          });
                      }
          
                      try {
                          int loc = 0;
                          int size = javaFiles.size();
                          for (int i = 0; i < size; i++) {
                              // take 方法等待下一個結(jié)果并返回 Future 對象。不直接返回計算結(jié)果是為了
                              // 捕獲計算時可能拋出的異常。
                              // poll 不等待,有結(jié)果就返回一個 Future 對象,否則返回 null。
                              loc += cs.take().get();
                          }
                          return loc;
                      } finally {
                          // 關(guān)閉線程池。也可以將線程池提升為字段以便重用。
                          // 如果任務(wù)線程(Callable#call)能響應(yīng)中斷,用 shutdownNow 更好。
                          es.shutdown();
                      }
                  }
              

          最后,CompletionService 也不是到處都能用,它不適合處理任務(wù)數(shù)量有限但個數(shù)不可知的場景。例如,要統(tǒng)計某個文件夾中的文件個數(shù),在遍歷子文件夾的時候也會“遞歸地”提交新的任務(wù),但最后到底提交了多少,以及在什么時候提交完了所有任務(wù),都是未知數(shù),無論 CompletionService 還是線程池都無法進行判斷。這種情況只能直接用線程池來處理。

          主站蜘蛛池模板: 罗田县| 综艺| 曲沃县| 小金县| 饶平县| 凌源市| 无极县| 扎囊县| 友谊县| 新龙县| 客服| 林口县| 育儿| 昌宁县| 亚东县| 盐津县| 神木县| 阿图什市| 临泉县| 黔江区| 年辖:市辖区| 武宣县| 青海省| 潮州市| 武冈市| 肥东县| 年辖:市辖区| 普兰县| 珠海市| 金坛市| 湘阴县| 昌乐县| 威远县| 保定市| 新宁县| 南宁市| 增城市| 临武县| 昌宁县| 冷水江市| 普宁市|