xylz,imxylz

          關(guān)注后端架構(gòu)、中間件、分布式和并發(fā)編程

             :: 首頁(yè) :: 新隨筆 :: 聯(lián)系 :: 聚合  :: 管理 ::
            111 隨筆 :: 10 文章 :: 2680 評(píng)論 :: 0 Trackbacks

           

          如果說(shuō)CountDownLatch是一次性的,那么CyclicBarrier正好可以循環(huán)使用。它允許一組線程互相等待,直到到達(dá)某個(gè)公共屏障點(diǎn) (common barrier point)。所謂屏障點(diǎn)就是一組任務(wù)執(zhí)行完畢的時(shí)刻。

           

          清單1 一個(gè)使用CyclicBarrier的例子

          package xylz.study.concurrency.lock;

          import java.util.concurrent.BrokenBarrierException;
          import java.util.concurrent.CyclicBarrier;

          public class CyclicBarrierDemo {

              final CyclicBarrier barrier;

              final int MAX_TASK;

              public CyclicBarrierDemo(int cnt) {
                  barrier = new CyclicBarrier(cnt + 1);
                  MAX_TASK = cnt;
              }

              public void doWork(final Runnable work) {
                  new Thread() {

                      public void run() {
                          work.run();
                          try {
                              int index = barrier.await();
                              doWithIndex(index);
                          } catch (InterruptedException e) {
                              return;
                          } catch (BrokenBarrierException e) {
                              return;
                          }
                      }
                  }.start();
              }

              private void doWithIndex(int index) {
                  if (index == MAX_TASK / 3) {
                      System.out.println("Left 30%.");
                  } else if (index == MAX_TASK / 2) {
                      System.out.println("Left 50%");
                  } else if (index == 0) {
                      System.out.println("run over");
                  }
              }

              public void waitForNext() {
                  try {
                      doWithIndex(barrier.await());
                  } catch (InterruptedException e) {
                      return;
                  } catch (BrokenBarrierException e) {
                      return;
                  }
              }

              public static void main(String[] args) {
                  final int count = 10;
                  CyclicBarrierDemo demo = new CyclicBarrierDemo(count);
                  for (int i = 0; i < 100; i++) {
                      demo.doWork(new Runnable() {

                          public void run() {
                              //do something
                              try {
                                  Thread.sleep(1000L);
                              } catch (Exception e) {
                                  return;
                              }
                          }
                      });
                      if ((i + 1) % count == 0) {
                          demo.waitForNext();
                      }
                  }
              }

          }

          清單1描述的是一個(gè)周期性處理任務(wù)的例子,在這個(gè)例子中有一對(duì)的任務(wù)(100個(gè)),希望每10個(gè)為一組進(jìn)行處理,當(dāng)前僅當(dāng)上一組任務(wù)處理完成后才能進(jìn)行下一組,另外在每一組任務(wù)中,當(dāng)任務(wù)剩下50%,30%以及所有任務(wù)執(zhí)行完成時(shí)向觀察者發(fā)出通知。

          在這個(gè)例子中,CyclicBarrierDemo 構(gòu)建了一個(gè)count+1的任務(wù)組(其中一個(gè)任務(wù)時(shí)為了外界方便掛起主線程)。每一個(gè)子任務(wù)里,人物本身執(zhí)行完畢后都需要等待同組內(nèi)其它任務(wù)執(zhí)行完成后才能繼續(xù)。同時(shí)在剩下任務(wù)50%、30%已經(jīng)0時(shí)執(zhí)行特殊的其他任務(wù)(發(fā)通知)。

          很顯然CyclicBarrier有以下幾個(gè)特點(diǎn):

          • await()方法將掛起線程,直到同組的其它線程執(zhí)行完畢才能繼續(xù)
          • await()方法返回線程執(zhí)行完畢的索引,注意,索引時(shí)從任務(wù)數(shù)-1開(kāi)始的,也就是第一個(gè)執(zhí)行完成的任務(wù)索引為parties-1,最后一個(gè)為0,這個(gè)parties為總?cè)蝿?wù)數(shù),清單中是cnt+1
          • CyclicBarrier 是可循環(huán)的,顯然名稱說(shuō)明了這點(diǎn)。在清單1中,每一組任務(wù)執(zhí)行完畢就能夠執(zhí)行下一組任務(wù)。

          另外除了CyclicBarrier除了以上特點(diǎn)外,還有以下幾個(gè)特點(diǎn):

          • 如果屏障操作不依賴于掛起的線程,那么任何線程都可以執(zhí)行屏障操作。在清單1中可以看到并沒(méi)有指定那個(gè)線程執(zhí)行50%、30%、0%的操作,而是一組線程(cnt+1)個(gè)中任何一個(gè)線程只要到達(dá)了屏障點(diǎn)都可以執(zhí)行相應(yīng)的操作
          • CyclicBarrier 的構(gòu)造函數(shù)允許攜帶一個(gè)任務(wù),這個(gè)任務(wù)將在0%屏障點(diǎn)執(zhí)行,它將在await()==0后執(zhí)行。
          • CyclicBarrier 如果在await時(shí)因?yàn)橹袛唷⑹ ⒊瑫r(shí)等原因提前離開(kāi)了屏障點(diǎn),那么任務(wù)組中的其他任務(wù)將立即被中斷,以InterruptedException異常離開(kāi)線程。
          • 所有await()之前的操作都將在屏障點(diǎn)之前運(yùn)行,也就是CyclicBarrier 的內(nèi)存一致性效果

           

          CyclicBarrier 的所有API如下:

          • public CyclicBarrier(int parties) 創(chuàng)建一個(gè)新的 CyclicBarrier,它將在給定數(shù)量的參與者(線程)處于等待狀態(tài)時(shí)啟動(dòng),但它不會(huì)在啟動(dòng) barrier 時(shí)執(zhí)行預(yù)定義的操作。
          • public CyclicBarrier(int parties, Runnable barrierAction) 創(chuàng)建一個(gè)新的 CyclicBarrier,它將在給定數(shù)量的參與者(線程)處于等待狀態(tài)時(shí)啟動(dòng),并在啟動(dòng) barrier 時(shí)執(zhí)行給定的屏障操作,該操作由最后一個(gè)進(jìn)入 barrier 的線程執(zhí)行。
          • public int await() throws InterruptedException, BrokenBarrierException 在所有參與者都已經(jīng)在此 barrier 上調(diào)用 await 方法之前,將一直等待。
          • public int await(long timeout,TimeUnit unit) throws InterruptedException, BrokenBarrierException,TimeoutException 在所有參與者都已經(jīng)在此屏障上調(diào)用 await 方法之前將一直等待,或者超出了指定的等待時(shí)間。
          • public int getNumberWaiting() 返回當(dāng)前在屏障處等待的參與者數(shù)目。此方法主要用于調(diào)試和斷言。
          • public int getParties() 返回要求啟動(dòng)此 barrier 的參與者數(shù)目。
          • public boolean isBroken() 查詢此屏障是否處于損壞狀態(tài)。
          • public void reset() 將屏障重置為其初始狀態(tài)。

          針對(duì)以上API,下面來(lái)探討下CyclicBarrier 的實(shí)現(xiàn)原理,以及為什么有這樣的API。

          清單2 CyclicBarrier.await*()的實(shí)現(xiàn)片段

              private int dowait(boolean timed, long nanos)
              throws InterruptedException, BrokenBarrierException,
                     TimeoutException {
              final ReentrantLock lock = this.lock;
              lock.lock();
              try {
                  final Generation g = generation;
                  if (g.broken)
                      throw new BrokenBarrierException();

                  if (Thread.interrupted()) {
                      breakBarrier();
                      throw new InterruptedException();
                  }

                 int index = --count;
                 if (index == 0) {  // tripped
                     boolean ranAction = false;
                     try {
                         final Runnable command = barrierCommand;
                         if (command != null)
                             command.run();
                         ranAction = true;
                         nextGeneration();
                         return 0;
                     } finally {
                         if (!ranAction)
                             breakBarrier();
                     }
                 }

                  // loop until tripped, broken, interrupted, or timed out
                  for (;;) {
                      try {
                          if (!timed)
                              trip.await();
                          else if (nanos > 0L)
                              nanos = trip.awaitNanos(nanos);
                      } catch (InterruptedException ie) {
                          if (g == generation && ! g.broken) {
                              breakBarrier();
                              throw ie;
                          } else {
                              Thread.currentThread().interrupt();
                          }
                      }

                      if (g.broken)
                          throw new BrokenBarrierException();

                      if (g != generation)
                          return index;

                      if (timed && nanos <= 0L) {
                          breakBarrier();
                          throw new TimeoutException();
                      }
                  }
              } finally {
                  lock.unlock();
              }
          }

          清單2有點(diǎn)復(fù)雜,這里一點(diǎn)一點(diǎn)的剖析,并且還原到最原始的狀態(tài)。

          利用前面學(xué)到的知識(shí),我們知道要想讓線程等待其他線程執(zhí)行完畢,那么已經(jīng)執(zhí)行完畢的線程(進(jìn)入await*()方法)就需要park(),直到超時(shí)或者被中斷,或者被其它線程喚醒。

          前面說(shuō)過(guò)CyclicBarrier 的特點(diǎn)是要么大家都正常執(zhí)行完畢,要么大家都異常被中斷,不會(huì)其中有一個(gè)被中斷而其它正常執(zhí)行完畢的現(xiàn)象存在。這種特點(diǎn)叫all-or-none。類似的概念是原子操作中的要么大家都執(zhí)行完,要么一個(gè)操作都不執(zhí)行完。當(dāng)前這其實(shí)是兩個(gè)概念了。要完成這樣的特點(diǎn)就必須有一個(gè)狀態(tài)來(lái)描述曾經(jīng)是否有過(guò)線程被中斷(broken)了,這樣后面執(zhí)行完的線程就該知道是否需要繼續(xù)等待了。而在CyclicBarrier 中Generation 就是為了完成這件事情的。Generation的定義非常簡(jiǎn)單,整個(gè)結(jié)構(gòu)就只有一個(gè)變量boolean broken = false;,定義是否發(fā)生了broken操作。

          由于有競(jìng)爭(zhēng)資源的存在(broken/index),所以毫無(wú)疑問(wèn)需要一把鎖lock。拿到鎖后整個(gè)過(guò)程是這樣的:

          1. 檢查是否存在中斷位(broken),如果存在就立即以BrokenBarrierException異常返回。此異常描述的是線程進(jìn)入屏障被破壞的等待狀態(tài)。否則進(jìn)行2。
          2. 檢查當(dāng)前線程是否被中斷,如果是那么就設(shè)置中斷位(使其它將要進(jìn)入等待的線程知道),另外喚醒已經(jīng)等待的線程,同時(shí)以InterruptedException異常返回,表示線程要處理中斷。否則進(jìn)行3。
          3. 將剩余任務(wù)數(shù)減1,如果此時(shí)剩下的任務(wù)數(shù)為0,也就是達(dá)到了公共屏障點(diǎn),那么就執(zhí)行屏障點(diǎn)任務(wù)(如果有的話),同時(shí)創(chuàng)建新的Generation(在這個(gè)過(guò)程中會(huì)喚醒其它所有線程,因此當(dāng)前線程是屏障點(diǎn)線程,那么其它線程就都應(yīng)該在等待狀態(tài))。否則進(jìn)行4。
          4. 到這里說(shuō)明還沒(méi)有到達(dá)屏障點(diǎn),那么此時(shí)線程就應(yīng)該park()。很顯然在下面的for循環(huán)中就是要park線程。這里park線程采用的是Condition.await()方法。也就是trip.await*()。為什么需要Condition?因?yàn)樗械腶wait*()其實(shí)等待的都是一個(gè)條件,一旦條件滿足就應(yīng)該都被喚醒,所以Condition整好滿足這個(gè)特點(diǎn)。所以到這里就會(huì)明白為什么在步驟3中到達(dá)屏障點(diǎn)時(shí)創(chuàng)建新的Generation的時(shí)候是一定要喚醒其它線程的原因了。

          上面4個(gè)步驟其實(shí)只是描述主體結(jié)構(gòu),事實(shí)上整個(gè)過(guò)程中有非常多的邏輯來(lái)處理異常引發(fā)的問(wèn)題,比如執(zhí)行屏障點(diǎn)任務(wù)引發(fā)的異常,park線程超時(shí)引發(fā)的中斷異常和超時(shí)異常等等。所以對(duì)于await()而言,異常的處理比業(yè)務(wù)邏輯的處理更復(fù)雜,這就解釋了為什么await()的時(shí)候可能引發(fā)InterruptedException,BrokenBarrierException,TimeoutException 三種異常。

          清單3 生成下一個(gè)循環(huán)周期并喚醒其它線程

          private void nextGeneration() {
               trip.signalAll();
               count = parties;
               generation = new Generation();
          }

          清單3 描述了如何生成下一個(gè)循環(huán)周期的過(guò)程,在這個(gè)過(guò)程中當(dāng)然需要使用Condition.signalAll()喚醒所有已經(jīng)執(zhí)行完成并且正在等待的線程。另外這里count描述的是還有多少線程需要執(zhí)行,是為了線程執(zhí)行完畢索引計(jì)數(shù)。

          isBroken() 方法描述的就是generation.broken,也即線程組是否發(fā)生了異常。這里再一次解釋下為什么要有這個(gè)狀態(tài)的存在。

          如果一個(gè)將要位于屏障點(diǎn)或者已經(jīng)位于屏障點(diǎn)的而執(zhí)行屏障點(diǎn)任務(wù)的線程發(fā)生了異常,那么即使喚醒了其它等待的線程,其它等待的線程也會(huì)因?yàn)檠h(huán)等待而“死去”,因?yàn)樵僖矝](méi)有一個(gè)線程來(lái)喚醒這些第二次進(jìn)行park的線程了。還有一個(gè)意圖是,如果屏障點(diǎn)都已經(jīng)損壞了,那么其它將要等待屏障點(diǎn)的再線程掛起就沒(méi)有意義了。

          寫到這里的時(shí)候非常不幸,用了4年多了臺(tái)燈終于“壽終正寢了”。

          其實(shí)CyclicBarrier 還有一個(gè)reset方法,描述的是手動(dòng)立即將所有線程中斷,恢復(fù)屏障點(diǎn),進(jìn)行下一組任務(wù)的執(zhí)行。也就是與重新創(chuàng)建一個(gè)新的屏障點(diǎn)相比,可能維護(hù)的代價(jià)要小一些(減少同步,減少上一個(gè)CyclicBarrier 的管理等等)。

          本來(lái)是想和Semaphore 一起將的,最后發(fā)現(xiàn)鋪開(kāi)后就有點(diǎn)長(zhǎng)了,而且也不利于理解和吸收,所以放到下一篇吧。

           

          參考資料:

          1. 使用 CyclicBarrier 做線程間同步

          2. CyclicBarrier And CountDownLatch Tutorial

          3. 線程—CyclicBarrier

          4. Java線程學(xué)習(xí)筆記(十)CountDownLatch 和CyclicBarrier

          5. 關(guān)于多線程同步的初步教程--Barrier的設(shè)計(jì)及使用

          6. Thread coordination with CountDownLatch and CyclicBarrier

          7. 如何充分利用多核CPU,計(jì)算很大的List中所有整數(shù)的和



          ©2009-2014 IMXYLZ |求賢若渴
          posted on 2010-07-12 23:33 imxylz 閱讀(22397) 評(píng)論(2)  編輯  收藏 所屬分類: J2EE

          評(píng)論

          # re: 深入淺出 Java Concurrency (11): 鎖機(jī)制 part 6 CyclicBarrier 2010-07-13 16:12 Mercy
          和CountDownLatch類似,不過(guò)由于它的reset方法讓他可以重用。  回復(fù)  更多評(píng)論
            

          # re: 深入淺出 Java Concurrency (11): 鎖機(jī)制 part 6 CyclicBarrier 2011-03-04 17:02 cxb
          上面的CyclicBarrier例子,效果并不理想:
          當(dāng)一組任務(wù)執(zhí)行完后才一次性按下面的順序打印出:
          run over Left 50% Left 30%
          而并不是(非一次性)
          Left 50% Left 30% run over
          (run over先打印應(yīng)該是:
          最后一個(gè)執(zhí)行barrier.await()最先返回0,其他的還在喚醒中)  回復(fù)  更多評(píng)論
            


          ©2009-2014 IMXYLZ
          主站蜘蛛池模板: 乐清市| 梨树县| 云霄县| 新泰市| 晋江市| 嵊州市| 洛浦县| 奇台县| 黄龙县| 比如县| 金阳县| 乌鲁木齐县| 化州市| 深圳市| 汤阴县| 乐业县| 红河县| 普安县| 丹阳市| 黎川县| 杨浦区| 赞皇县| 奉节县| 阿城市| 乌拉特中旗| 金坛市| 丰台区| 黔江区| 原平市| 靖边县| 虞城县| 哈密市| 新兴县| 杭锦旗| 鹿邑县| 桐梓县| 柞水县| 西吉县| 正镶白旗| 中牟县| 西华县|