xylz,imxylz

          關注后端架構、中間件、分布式和并發編程

             :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理 ::
            111 隨筆 :: 10 文章 :: 2680 評論 :: 0 Trackbacks

          此小節介紹幾個與鎖有關的有用工具。

          閉鎖(Latch)

          閉鎖(Latch):一種同步方法,可以延遲線程的進度直到線程到達某個終點狀態。通俗的講就是,一個閉鎖相當于一扇大門,在大門打開之前所有線程都被阻斷,一旦大門打開所有線程都將通過,但是一旦大門打開,所有線程都通過了,那么這個閉鎖的狀態就失效了,門的狀態也就不能變了,只能是打開狀態。也就是說閉鎖的狀態是一次性的,它確保在閉鎖打開之前所有特定的活動都需要在閉鎖打開之后才能完成。

          CountDownLatch是JDK 5+里面閉鎖的一個實現,允許一個或者多個線程等待某個事件的發生。CountDownLatch有一個正數計數器,countDown方法對計數器做減操作,await方法等待計數器達到0。所有await的線程都會阻塞直到計數器為0或者等待線程中斷或者超時。

          CountDownLatch的API如下。

          • public void await() throws InterruptedException
          • public boolean await(long timeout, TimeUnit unit) throws InterruptedException
          • public void countDown()
          • public long getCount()

          其中getCount()描述的是當前計數,通常用于調試目的。

          下面的例子中描述了閉鎖的兩種常見的用法。

          package xylz.study.concurrency.lock;

          import java.util.concurrent.CountDownLatch;

          public class PerformanceTestTool {

              public long timecost(final int times, final Runnable task) throws InterruptedException {
                  if (times <= 0) throw new IllegalArgumentException();
                  final CountDownLatch startLatch = new CountDownLatch(1);
                  final CountDownLatch overLatch = new CountDownLatch(times);
                  for (int i = 0; i < times; i++) {
                      new Thread(new Runnable() {
                          public void run() {
                              try {
                                  startLatch.await();
                                  //
                                  task.run();
                              } catch (InterruptedException ex) {
                                  Thread.currentThread().interrupt();
                              } finally {
                                  overLatch.countDown();
                              }
                          }
                      }).start();
                  }
                  //
                  long start = System.nanoTime();
                  startLatch.countDown();
                  overLatch.await();
                  return System.nanoTime() - start;
              }

          }

          在上面的例子中使用了兩個閉鎖,第一個閉鎖確保在所有線程開始執行任務前,所有準備工作都已經完成,一旦準備工作完成了就調用startLatch.countDown()打開閉鎖,所有線程開始執行。第二個閉鎖在于確保所有任務執行完成后主線程才能繼續進行,這樣保證了主線程等待所有任務線程執行完成后才能得到需要的結果。在第二個閉鎖當中,初始化了一個N次的計數器,每個任務執行完成后都會將計數器減一,所有任務完成后計數器就變為了0,這樣主線程閉鎖overLatch拿到此信號后就可以繼續往下執行了。

          根據前面的happend-before法則可以知道閉鎖有以下特性:

          內存一致性效果:線程中調用 countDown() 之前的操作 happen-before 緊跟在從另一個線程中對應 await() 成功返回的操作。

          在上面的例子中第二個閉鎖相當于把一個任務拆分成N份,每一份獨立完成任務,主線程等待所有任務完成后才能繼續執行。這個特性在后面的線程池框架中會用到,其實FutureTask就可以看成一個閉鎖。后面的章節還會具體分析FutureTask的。

           

          同樣基于探索精神,仍然需要“窺探”下CountDownLatch里面到底是如何實現await*countDown的。

          首先,研究下await()方法。內部直接調用了AQSacquireSharedInterruptibly(1)

          public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
              if (Thread.interrupted())
                  throw new InterruptedException();
              if (tryAcquireShared(arg) < 0)
                  doAcquireSharedInterruptibly(arg);
          }

          前面一直提到的都是獨占鎖(排它鎖、互斥鎖),現在就用到了另外一種鎖,共享鎖。

          所謂共享鎖是說所有共享鎖的線程共享同一個資源,一旦任意一個線程拿到共享資源,那么所有線程就都擁有的同一份資源。也就是通常情況下共享鎖只是一個標志,所有線程都等待這個標識是否滿足,一旦滿足所有線程都被激活(相當于所有線程都拿到鎖一樣)。這里的閉鎖CountDownLatch就是基于共享鎖的實現。

          閉鎖中關于AQStryAcquireShared的實現是如下代碼(java.util.concurrent.CountDownLatch.Sync.tryAcquireShared):

          public int tryAcquireShared(int acquires) {
              return getState() == 0? 1 : -1;
          }

          在這份邏輯中,對于閉鎖而言第一次await時tryAcquireShared應該總是-1,因為對于閉鎖CountDownLatch而言state的值就是初始化的count值。這也就解釋了為什么在countDown調用之前閉鎖的count總是>0。

          private void doAcquireSharedInterruptibly(int arg)
              throws InterruptedException {
              final Node node = addWaiter(Node.SHARED);
              try {
                  for (;;) {
                      final Node p = node.predecessor();
                      if (p == head) {
                          int r = tryAcquireShared(arg);
                          if (r >= 0) {
                              setHeadAndPropagate(node, r);
                              p.next = null; // help GC
                              return;
                          }
                      }
                      if (shouldParkAfterFailedAcquire(p, node) &&
                          parkAndCheckInterrupt())
                          break;
                  }
              } catch (RuntimeException ex) {
                  cancelAcquire(node);
                  throw ex;
              }
              // Arrive here only if interrupted
              cancelAcquire(node);
              throw new InterruptedException();
          }

          上面的邏輯展示了如何通過await將所有線程串聯并掛起,直到被喚醒或者條件滿足或者被中斷。整個過程是這樣的:

            1. 將當前線程節點以共享模式加入AQSCLH隊列中(相關概念參考這里這里)。進行2。
            2. 檢查當前節點的前任節點,如果是頭結點并且當前閉鎖計數為0就將當前節點設置為頭結點,喚醒繼任節點,返回(結束線程阻塞)。否則進行3。
            3. 檢查線程是否該阻塞,如果應該就阻塞(park),直到被喚醒(unpark)。重復2。
            4. 如果2、3有異常就拋出異常(結束線程阻塞)。

          這里有一點值得說明下,設置頭結點并喚醒繼任節點setHeadAndPropagate。由于前面tryAcquireShared總是返回1或者-1,而進入setHeadAndPropagate時總是propagate>=0,所以這里propagate==1。后面喚醒繼任節點操作就非常熟悉了。

          private void setHeadAndPropagate(Node node, int propagate) {
              setHead(node);
              if (propagate > 0 && node.waitStatus != 0) {
                  Node s = node.next;
                  if (s == null || s.isShared())
                      unparkSuccessor(node);
              }
          }

          從上面的所有邏輯可以看出countDown應該就是在條件滿足(計數為0)時喚醒頭結點(時間最長的一個節點),然后頭結點就會根據FIFO隊列喚醒整個節點列表(如果有的話)。

          CountDownLatchcountDown代碼中看到,直接調用的是AQSreleaseShared(1),參考前面的知識,這就印證了上面的說法。

          tryReleaseShared中正是采用CAS操作減少計數(每次減-1)。

          public boolean tryReleaseShared(int releases) {
              for (;;) {
                  int c = getState();
                  if (c == 0)
                      return false;
                  int nextc = c-1;
                  if (compareAndSetState(c, nextc))
                      return nextc == 0;
              }
          }

          整個CountDownLatch就是這個樣子的。其實有了前面原子操作和AQS的原理及實現,分析CountDownLatch還是比較容易的。

           



          ©2009-2014 IMXYLZ |求賢若渴
          posted on 2010-07-09 09:21 imxylz 閱讀(29334) 評論(6)  編輯  收藏 所屬分類: J2EE

          評論

          # re: 深入淺出 Java Concurrency (10): 鎖機制 part 5 閉鎖 (CountDownLatch)[未登錄] 2010-07-12 19:40 行云流水
          堅持。幾天沒有更新了,堅持一天3篇,支持支持。。  回復  更多評論
            

          # re: 深入淺出 Java Concurrency (10): 鎖機制 part 5 閉鎖 (CountDownLatch) 2010-07-12 23:40 xylz
          @行云流水

          多謝鼓勵!爭取不“太監”了。  回復  更多評論
            

          # re: 深入淺出 Java Concurrency (10): 鎖機制 part 5 閉鎖 (CountDownLatch) 2012-02-02 12:15 漆黑之牙
          @xylz
          文章太長,看完頭都大了!  回復  更多評論
            

          # re: 深入淺出 Java Concurrency (10): 鎖機制 part 5 閉鎖 (CountDownLatch) 2012-06-19 00:15 vinfai
          應該向你好好學習!都落伍了  回復  更多評論
            

          # re: 深入淺出 Java Concurrency (10): 鎖機制 part 5 閉鎖 (CountDownLatch) 2013-03-26 15:42 ffengtian
          所有的請求await的線程都阻塞了,而countdouwn操作只是state-1,哪個操作執行喚醒阻塞的  回復  更多評論
            

          # re: 深入淺出 Java Concurrency (10): 鎖機制 part 5 閉鎖 (CountDownLatch) 2013-10-02 22:27 txbhcml
          @ffengtian
          doReleaseShared  回復  更多評論
            


          ©2009-2014 IMXYLZ
          主站蜘蛛池模板: 泾川县| 南漳县| 宣威市| 噶尔县| 江都市| 治县。| 义马市| 清流县| 兰考县| 祁门县| 二连浩特市| 博湖县| 儋州市| 徐州市| 临沧市| 孟州市| 镇沅| 柳江县| 永定县| 托克逊县| 肇州县| 湘潭县| 四川省| 特克斯县| 衡山县| 思南县| 阿克| 盐城市| 济源市| 崇信县| 沙湾县| 广西| 阿克陶县| 鄂托克旗| 浦城县| 闸北区| 新平| 武隆县| 博爱县| 闵行区| 富平县|