posts - 156,  comments - 601,  trackbacks - 0

          CyclicBarrier一個同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。假如在涉及一組固定大小的線程的程序應用中,要求這些線程必須不時地互相等待(保證所有線程都執行完畢才返回),那么選擇 CyclicBarrier 就會讓這個實現變得非常容易。CyclicBarrier 在釋放等待線程后可以重用,所以又稱它為循環 barrier

          CyclicBarrier 支持一個可選的 Runnable 命令,在一組線程中的最后一個線程到達之后(但在釋放所有線程之前),該命令在每個屏障點完成前只運行一次。

           

          下面是一個JDK官方自帶示例(只演示使用方式,目前不能直接運行),可以讓大家更好的了解一下CyclicBarrier的使用。

          示例說明:有一個兩維數組,保存的N行的數據內容。現在需要有N個線程,每個線程處理一行結果,當所有N的結果都處理完成后,返回。示例代碼如下:

           

           

          public class Solver {

              final int N;

              final float[][] data;

              final CyclicBarrier barrier;

           

              class Worker implements Runnable {

                  int myRow;

           

                  Worker(int row) {

                      myRow = row;

                  }

           

                  public void run() {

                      while (!done()) {

                          processRow(myRow);

           

                          try {

                              barrier.await();

                          } catch (InterruptedException ex) {

                              return;

                          } catch (BrokenBarrierException ex) {

                              return;

                          }

                      }

                  }

              }

           

              public Solver(float[][] matrix) {

                data = matrix;

                        N = matrix.length;

                //創建N個大小的Barrier,調用 barrier.await方法來等待線程結束

                barrier = new CyclicBarrier(N,

                                            new Runnable() {

                                              public void run() {

                                                mergeRows(...);

                                              }

                                            });

                for (int i = 0; i < N; ++i)

                  new Thread(new Worker(i)).start();

           

                waitUntilDone();

              }

          }

          在這個例子中,每個 worker 線程處理一行數據,在處理完所有的行之前,該線程將一直在屏障處等待。

          處理完所有的行之后,將執行所提供的 Runnable 屏障操作,并合并這些行。

          如果合并者確定已經找到了一個解決方案,那么 done() 將返回 true,所有的 worker 線程都將終止。 

          如果屏障操作在執行時不依賴于正掛起的線程,則線程組中的任何線程在獲得釋放時都能執行該操作。

          為方便此操作,每次調用 await() 都將返回能到達屏障處的線程的索引。然后,您可以選擇哪個線程應該執行屏障操作,例如: 
            if (barrier.await() == 0) {
               // log the completion of this iteration
             }
          對于失敗的同步嘗試,CyclicBarrier 使用了一種要么全部要么全不 (all-or-none) 的破壞模式:如果因為中斷、失敗或者超時等原因,導致線程過早地離開了屏障點,那么在該屏障點等待的其他所有線程也將通過 BrokenBarrierException(如果它們幾乎同時被中斷,則用 InterruptedException)以反常的方式離開。 

          內存一致性效果:線程中調用 await() 之前的操作 happen-before 那些是屏障操作的一部份的操作,后者依次 happen-before 緊跟在從另一個線程中對應 await() 成功返回的操作。

           

          主要API介紹

          await

          public int await() throws InterruptedException, BrokenBarrierException

          將一直等待。 

          如果當前線程不是將到達的最后一個線程,出于調度目的,將禁用它,且在發生以下情況之一前,該線程將一直處于休眠狀態: 

          最后一個線程到達;

          或者 其他某個線程中斷當前線程;

          或者 其他某個線程中斷另一個等待線程;

          或者 其他某個線程在等待 barrier 時超時;

          或者 其他某個線程在此 barrier 上調用 reset() 

           

          如果當前線程: 
          在進入此方法時已經設置了該線程的中斷狀態;或者 
          在等待時被中斷 
          則拋出 InterruptedException,并且清除當前線程的已中斷狀態。 
          如果在線程處于等待狀態時 barrier reset(),或者在調用 await barrier 被損壞,抑或任意一個線程正處于等待狀態,則拋出 BrokenBarrierException 異常。 

          如果任何線程在等待時被 中斷,則其他所有等待線程都將拋出 BrokenBarrierException 異常,并將 barrier 置于損壞狀態。 

          如果當前線程是最后一個將要到達的線程,并且構造方法中提供了一個非空的屏障操作,則在允許其他線程繼續運行之前,當前線程將運行該操作。
          如果在執行屏障操作過程中發生異常,則該異常將傳播到當前線程中,并將 barrier 置于損壞狀態。 

          返回:
          到達的當前線程的索引,其中,索引 getParties() - 1 指示將到達的第一個線程,零指示最后一個到達的線程 
          拋出: 
          InterruptedException -
          如果當前線程在等待時被中斷 
          BrokenBarrierException -
          如果另一個 線程在當前線程等待時被中斷或超時,或者重置了 barrier,或者在調用 await barrier 被損壞,抑或由于異常而導致屏障操作(如果存在)失敗。

           

          getNumberWaiting

            public int getNumberWaiting()

          返回當前在屏障處等待的參與者數目。此方法主要用于調試和斷言。 
          返回 當前阻塞在 await() 中的參與者數目。

           

          完整示例

          示例說明:假如一個飛機乘客的登機過程,要求乘客的身份證驗證通過,乘客的登機牌已經更換以及乘客的行禮已經確認后才可以登錄。

          完整代碼如下:

              public static void main(String[] args) {

                 

                  final int TOTAL_STEPS = 3;//步驟個數

                 

                  final String[] steps = new String[] {"換登機牌確認", "身份證確認", "行禮確認"};

                 

                  final CyclicBarrier cb = new CyclicBarrier(TOTAL_STEPS);

                  ExecutorService es = Executors.newFixedThreadPool(TOTAL_STEPS); // 在線程池中放入三個線程

                  for (int i = 0; i < TOTAL_STEPS; i++) { // 開啟三個任務

                      final String name = steps[i];

                      es.execute(new Runnable() {

           

                          public void run() {

                              for (int i = 0; i < 5; i++) {

                                  try {

                                      String passenger = "乘客"+i;

                                      int wait = new Random().nextInt(5000);

                                      Thread.sleep(wait);

                                      System.out.print(passenger + " " + name + "【確認通過 + 耗時:" + wait);

                                      // 如果有2個線程已經在等待,那么最后一個線程到達后就可以一起開始后面操作

                                      if (cb.getNumberWaiting() + 1 == 3) {

                                          System.out.println(" 全部通過,確認下一個乘客");

                                      } else {

                                          System.out.println(" 還有"

                                                  + (TOTAL_STEPS - cb.getNumberWaiting() - 1) + "個任務等待");

                                      }

                                      cb.await();

                                  } catch (Exception e) {

                                      e.printStackTrace();

                                  }

                              }

                          }

                      });

                  }

                  es.shutdown();

              }

          }

          執行結果如下:

          乘客0 行禮確認【確認通過 + 耗時:1616 還有2個任務等待

          乘客0 身份證確認【確認通過 + 耗時:1792 還有1個任務等待

          乘客0 換登機牌確認【確認通過 + 耗時:3891 全部通過,確認下一個乘客

          乘客1 身份證確認【確認通過 + 耗時:282 還有2個任務等待

          乘客1 換登機牌確認【確認通過 + 耗時:4354 還有1個任務等待

          乘客1 行禮確認【確認通過 + 耗時:4996 全部通過,確認下一個乘客

          乘客2 身份證確認【確認通過 + 耗時:2977 還有2個任務等待

          乘客2 行禮確認【確認通過 + 耗時:3848 還有1個任務等待

          乘客2 換登機牌確認【確認通過 + 耗時:4069 全部通過,確認下一個乘客

          乘客3 換登機牌確認【確認通過 + 耗時:905 還有2個任務等待

          乘客3 身份證確認【確認通過 + 耗時:1916 還有1個任務等待

          乘客3 行禮確認【確認通過 + 耗時:4710 全部通過,確認下一個乘客

          乘客4 身份證確認【確認通過 + 耗時:1371 還有2個任務等待

          乘客4 行禮確認【確認通過 + 耗時:1768 還有1個任務等待

          乘客4 換登機牌確認【確認通過 + 耗時:2498 全部通過,確認下一個乘客

           

          Good Luck!

          Yours Matthew!


           

          posted on 2012-06-28 13:34 x.matthew 閱讀(2753) 評論(1)  編輯  收藏 所屬分類: Best Practise(JDK API)
          主站蜘蛛池模板: 鄂托克前旗| 莱芜市| 仙居县| 栾城县| 惠州市| 南安市| 宜都市| 成安县| 灌云县| 衡阳市| 娄烦县| 建德市| 大冶市| 万源市| 舒城县| 苍梧县| 玛沁县| 泊头市| 灵武市| 临沧市| 兴义市| 田林县| 呼伦贝尔市| 嵊州市| 保定市| 乌苏市| 凤山市| 元阳县| 乌兰察布市| 射阳县| 普安县| 丹寨县| 洪雅县| 柳江县| 佳木斯市| 福贡县| 鱼台县| 永靖县| 关岭| 峨山| 枣强县|