神奇好望角 The Magical Cape of Good Hope

          庸人不必自擾,智者何需千慮?
          posts - 26, comments - 50, trackbacks - 0, articles - 11
            BlogJava :: 首頁 ::  :: 聯系 :: 聚合  :: 管理

          非主流并發工具之 ForkJoinPool

          Posted on 2012-02-09 10:40 蜀山兆孨龘 閱讀(2682) 評論(2)  編輯  收藏 所屬分類: Java SE

          ForkJoinPool 是 Java SE 7 新功能“分叉/結合框架”的核心類,現在可能乏人問津,但我覺得它遲早會成為主流。分叉/結合框架是一個比較特殊的線程池框架,專用于需要將一個任務不斷分解成子任務(分叉),再不斷進行匯總得到最終結果(結合)的計算過程。比起傳統的線程池類 ThreadPoolExecutor,ForkJoinPool 實現了工作竊取算法,使得空閑線程能夠主動分擔從別的線程分解出來的子任務,從而讓所有的線程都盡可能處于飽滿的工作狀態,提高執行效率。

          ForkJoinPool 提供了三類方法來調度子任務:

          execute 系列
          異步執行指定的任務。
          invokeinvokeAll
          執行指定的任務,等待完成,返回結果。
          submit 系列
          異步執行指定的任務并立即返回一個 Future 對象。

          子任務由 ForkJoinTask 的實例來代表。它是一個抽象類,JDK 為我們提供了兩個實現:RecursiveTaskRecursiveAction,分別用于需要和不需要返回計算結果的子任務。ForkJoinTask 提供了三個靜態的 invokeAll 方法來調度子任務,注意只能在 ForkJoinPool 執行計算的過程中調用它們。

          ForkJoinPoolForkJoinTask 還提供了很多讓人眼花繚亂的公共方法,其實它們大多數都是其內部實現去調用的,對于應用開發人員來說意義不大。

          下面以統計 D 盤文件個數為例。這實際上是對一個文件樹的遍歷,我們需要遞歸地統計每個目錄下的文件數量,最后匯總,非常適合用分叉/結合框架來處理:

          // 處理單個目錄的任務
          public class CountingTask extends RecursiveTask<Integer> {
              private Path dir;
          
              public CountingTask(Path dir) {
                  this.dir = dir;
              }
          
              @Override
              protected Integer compute() {
                  int count = 0;
                  List<CountingTask> subTasks = new ArrayList<>();
          
                  // 讀取目錄 dir 的子路徑。
                  try (DirectoryStream<Path> ds = Files.newDirectoryStream(dir)) {
                      for (Path subPath : ds) {
                          if (Files.isDirectory(subPath, LinkOption.NOFOLLOW_LINKS)) {
                              // 對每個子目錄都新建一個子任務。
                              subTasks.add(new CountingTask(subPath));
                          } else {
                              // 遇到文件,則計數器增加 1。
                              count++;
                          }
                      }
          
                      if (!subTasks.isEmpty()) {
                          // 在當前的 ForkJoinPool 上調度所有的子任務。
                          for (CountingTask subTask : invokeAll(subTasks)) {
                              count += subTask.join();
                          }
                      }
                  } catch (IOException ex) {
                      return 0;
                  }
                  return count;
              }
          }
          
          // 用一個 ForkJoinPool 實例調度“總任務”,然后敬請期待結果……
          Integer count = new ForkJoinPool().invoke(new CountingTask(Paths.get("D:/")));
              

          在我的筆記本上,經多次運行這段代碼,耗費的時間穩定在 600 豪秒左右。普通線程池(Executors.newCachedThreadPool())耗時 1100 毫秒左右,足見工作竊取的優勢。

          結束本文前,我們來圍觀一個最神奇的結果:單線程算法(使用 Files.walkFileTree(...))比這兩個都快,平均耗時 550 毫秒!這警告我們并非引入多線程就能優化性能,并須要先經過多次測試才能下結論。


          評論

          # re: 非主流并發工具之 ForkJoinPool  回復  更多評論   

          2012-02-24 14:53 by 恩亠
          600毫秒?不是600秒嗎?那你的D盤也太干凈了,我的有上百萬文件,根本就是半天沒有結果

          # re: 非主流并發工具之 ForkJoinPool  回復  更多評論   

          2012-02-24 22:48 by 蜀山兆孨龘
          確實是 600 毫秒,看來你下的片片太多了哈哈。上百萬的文件數量即使在 Windows 里面右鍵看屬性都很慢……
          主站蜘蛛池模板: 开平市| 北川| 龙胜| 绩溪县| 诸城市| 巴彦淖尔市| 九龙坡区| 绥芬河市| 和平县| 江川县| 深水埗区| 确山县| 西峡县| 昌宁县| 日土县| 迁西县| 长子县| 桓仁| 综艺| 呼和浩特市| 临颍县| 长汀县| 平阳县| 双江| 织金县| 射阳县| 保定市| 措勤县| 垣曲县| 墨竹工卡县| 平阴县| 威远县| 滦南县| 宿松县| 盈江县| 海门市| 常山县| 资源县| 中方县| 那坡县| 江门市|