java thread 之AQS

          JDK1.5引入了Doug Lea大神的concurrent框架,其中AbstractQueuedSynchronizer是concurrent框架的基本,從大神的paper中可以看到
          1.傳統的synchronized不能進行中段,這個不合適
          2.如果將concurrent重心放在少數競爭下優化鎖,而在其他情況下放任緩慢執行的策略是不正確的
          3.需要可預測的維護效率,即使在同步競爭激烈的情況下,理想中無論多少線程試圖通過一個同步點的開銷應該是恒定的
          4.設計的目標是總時間的減少,因為有可能在此之間一個線程可以通過同步點,然后他沒有立即執行
          5.在高吞吐量的基本上,更重要的是線程的公平調度

          AQS設計思路:
          原子的管理同步狀態;阻塞和解除阻塞線程;保持線程的阻塞隊列。

          實現
          1.在CLH queue的基礎上進行改造
          2.單個int state 計數synchronization狀態

          隊列的每個節點是內部類Node:
            static final class Node {
                 
          static final int CANCELLED =  1;
              
                  
          static final int SIGNAL    = -1;
                
                  
          static final int CONDITION = -2;
                
                  
          static final Node SHARED = new Node();
                 
                  
          static final Node EXCLUSIVE = null;

                  
          volatile int waitStatus;

                  
          volatile Node prev;
            
                  
          volatile Node next;
              
                  
          volatile Thread thread;

                  Node nextWaiter;
           
                  
          final boolean isShared() {
                      
          return nextWaiter == SHARED;
                  }
          }

          對于waitStatus>0的node在等待的遍歷當中是會被拋棄掉的,而nextWaiter在共享鎖和Lock的Condition中會用到

          void acquire(int arg)方法
          該方法是其他基于AQS實現的具體類獲得鎖或者信號等的基礎實現
              public final void acquire(int arg) {
                  
          if (!tryAcquire(arg) &&
                      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                      selfInterrupt();
              }
          tryAcquire(int arg) 留給具體子類實現,如果返回false,說明沒有得到鎖,則調用acquireQueued(final Node node, int arg)方法,在調用此方法前需要將當前線程的node加入隊列:

              private Node addWaiter(Node mode) {
                 //關鍵點1.
                  Node node 
          = new Node(Thread.currentThread(), mode);
                  
          // Try the fast path of enq; backup to full enq on failure
                  Node pred = tail;
                  
          if (pred != null) {
                      node.prev 
          = pred;
                      
          if (compareAndSetTail(pred, node)) {
                          pred.next 
          = node;
                          
          return node;
                      }
                  }
                  enq(node);
                  
          return node;
              }
          首先嘗試快速加入到隊尾,如果加入隊尾失敗,說明在此期間,其余的線程入隊了,那么真正的執行入隊操作:
           
           
          private Node enq(final Node node) {
                  
          for (;;) {
                      Node t 
          = tail;
                      
          if (t == null) { // Must initialize
                          Node h = new Node(); // Dummy header
                          h.next = node;
                          node.prev 
          = h;
                          
          if (compareAndSetHead(h)) {
                              tail 
          = node;
                              
          return h;
                          }
                      }
                      
          else {
                          node.prev 
          = t;
                          
          if (compareAndSetTail(t, node)) {
                              t.next 
          = node;
                              
          return t;
                          }
                      }
                  }
              }
          這是個循環,只到node加入到隊列中才返回,當是對隊列的第一次插入node時,必須初始化隊列,創建一個傀儡header,完成入隊操作后執行acquireQueued()方法:

              final boolean acquireQueued(final Node node, int arg) {
                  
          try {
                      
          boolean interrupted = false;
                      
          for (;;) {
                          
          final Node p = node.predecessor();
                          //關鍵點2.
                          if (p == head && tryAcquire(arg)) {
                         //關鍵點4.
                              setHead(node);
                              p.next 
          = null// help GC
                              return interrupted;
                          }
                          //關鍵點3
                          if (shouldParkAfterFailedAcquire(p, node) &&
                              parkAndCheckInterrupt())
                              interrupted 
          = true;
                      }
                  } 
          catch (RuntimeException ex) {
                      cancelAcquire(node);
                      
          throw ex;
                  }
              }
          前面的方法可以看出當前thread的對應的node就是參數傳入的node,獲得node的前驅,如果當前前驅是傀儡header而且子類的tryAcquire()方法返回為真,那么當前的node變成header
          ,而且返回node對應的thread的中斷狀態,如果前面流程沒有執行,執行shouldParkAfterFailedAcquire():
           private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

                  
          int s = pred.waitStatus;
                  
          if (s < 0)
                      
          /*
                       * This node has already set status asking a release
                       * to signal it, so it can safely park
                       
          */
                      
          return true;
                  
          if (s > 0) {
                      
          /*
                       * Predecessor was cancelled. Skip over predecessors and
                       * indicate retry.
                       
          */
                  
          do {
                  node.prev 
          = pred = pred.prev;
                  } 
          while (pred.waitStatus > 0);
                  pred.next 
          = node;
              }
                  
          else
                      
          /*
                       * Indicate that we need a signal, but don't park yet. Caller
                       * will need to retry to make sure it cannot acquire before
                       * parking.
                       
          */
                      compareAndSetWaitStatus(pred, 
          0, Node.SIGNAL);
                  
          return false;
              }
          得到node的前驅prev的waitStatus,當waitStatus>0,那么prev處于退出狀態,從隊列中去除prev,如果waitStatus<0,說明線程需要park住,如果沒有為0,就是新建的,需要設置成
          -1,在下一次循環中,如果還不能獲得鎖那么就需要park住線程,parkAndCheckInterrupt()就是執行park()線程,然后返回線程中斷狀態.
          acquireQueued()方法是個死循環,只到header之后的一個線程獲得鎖之后,才會重新設置header,被設置為新header的是header的next node,就這樣返回到acquire()中,如果返回的中斷狀態為true,當前線程中斷.
          總結下:
          關鍵點1.將線程入隊列,不管是否是第一個線程。在和其他線程競爭前,嘗試入隊一次
          關鍵點2.header只是一個傀儡node,他代表上次獲得的線程,在這里只有header的后面一個thread嘗試成功了,才會跳出循環,然后其他的線程只能在循環中
          關鍵點3.檢查加入的node對應thread是否需要park(),當node的waitStatus>0就是取消的node,不需要park(),其他的需要park().park住的線程就一直阻塞到unpark
          關鍵點4.這里setHead()方法把當前取得鎖的node設置為header,使得隊列往前走了一步。

          boolean release(int arg) 方法
          這個方法是unlock()等方法的基礎方法

              public final boolean release(int arg) {
                  
          if (tryRelease(arg)) {
                      Node h 
          = head;
                      
          if (h != null && h.waitStatus != 0)
                          unparkSuccessor(h);
                      
          return true;
                  }
                  
          return false;
              }
          tryRelease()留給子類實現,在隊列header處的node不為空而且狀態不為初始狀態的話(初始為0),需要為這個header所持有的thread unpark

              private void unparkSuccessor(Node node) {
                  
          /*
                   * Try to clear status in anticipation of signalling.  It is
                   * OK if this fails or if status is changed by waiting thread.
                   
          */
                  compareAndSetWaitStatus(node, Node.SIGNAL, 
          0);

                  
          /*
                   * Thread to unpark is held in successor, which is normally
                   * just the next node.  But if cancelled or apparently null,
                   * traverse backwards from tail to find the actual
                   * non-cancelled successor. 這里貌似是cancelAcquire()方法引起從后向前遍歷
                   
          */
                  Node s 
          = node.next;
                  
          if (s == null || s.waitStatus > 0) {
                      s 
          = null;
                      
          for (Node t = tail; t != null && t != node; t = t.prev)
                          
          if (t.waitStatus <= 0)
                              s 
          = t;
                  }
                  
          if (s != null)
                      LockSupport.unpark(s.thread);
              }
          在acquire()方法中知道header是個傀儡header,所以這個方法中得到header的next,如果next不為空或者next為取消的node,則從tail開始向前遍歷,找到最前面waitStatus<=0的node,然后unpark node的thread。

          void cancelAcquire(Node node)
          該方法是一切取消獲得的基礎實現,代碼有些地方需要注意:
              private void cancelAcquire(Node node) {
              
          // Ignore if node doesn't exist
                  if (node == null)
                  
          return;

              node.thread 
          = null;

              
          // Skip cancelled predecessors
              Node pred = node.prev;
              
          while (pred.waitStatus > 0)
                  node.prev 
          = pred = pred.prev;

              
          // Getting this before setting waitStatus ensures staleness
              
          //前面的操作使得前置改變了 但是prex的后置還是沒有變
              Node predNext = pred.next;

              
          // Can use unconditional write instead of CAS here
              node.waitStatus = Node.CANCELLED;

              
          // If we are the tail, remove ourselves
              if (node == tail && compareAndSetTail(node, pred)) {
                  
          //tail就set null 了
                  compareAndSetNext(pred, predNext, null);
              } 
          else {
                  
          // If "active" predecessor found
                  
          //

                  
          if (pred != head
                  
          && (pred.waitStatus == Node.SIGNAL
                      
          || compareAndSetWaitStatus(pred, 0, Node.SIGNAL))
                  
          && pred.thread != null) {

                  
          // If successor is active, set predecessor's next link
                  Node next = node.next;
                  
          if (next != null && next.waitStatus <= 0)
                      compareAndSetNext(pred, predNext, next);
                  } 
          else {
                  
          //如果前面沒有阻塞的node,到本node下面的話,需要unpark了
                  unparkSuccessor(node);
                  }

                  node.next 
          = node; // help GC  這個會影響next節點的秩序  還好每次pred.waitStatus>0的檢測使得受影響的時間窗口比較小
              }
              }



          共享鎖
          從acquireShared(int arg)開始:
             public final void acquireShared(int arg) {
                  
          if (tryAcquireShared(arg) < 0)
                      doAcquireShared(arg);
              }
              
          private void doAcquireShared(int arg) {
                  
          final Node node = addWaiter(Node.SHARED);
                  
          try {
                      
          boolean interrupted = false;
                      
          for (;;) {
                          
          final Node p = node.predecessor();
                          
          if (p == head) {
                              
          int r = tryAcquireShared(arg);
                              
          if (r >= 0) {
                                  setHeadAndPropagate(node, r);
                                  p.next 
          = null// help GC
                                  if (interrupted)
                                      selfInterrupt();
                                  
          return;
                              }
                          }
                          
          if (shouldParkAfterFailedAcquire(p, node) &&
                              parkAndCheckInterrupt())
                              interrupted 
          = true;
                      }
                  } 
          catch (RuntimeException ex) {
                      cancelAcquire(node);
                      
          throw ex;
                  }
              }
           
                private void setHeadAndPropagate(Node node, int propagate) {
                   //隊列向后移動一位
                  setHead(node);
                  //propagate>0說明,共享數值是大于前面要求的數值的
                  if (propagate > 0 && node.waitStatus != 0) {
                      /*
                       * Don't bother fully figuring out successor.  If it
                       * looks null, call unparkSuccessor anyway to be safe.
                       */
                      Node s = node.next;
                      //如果只剩下一個node節點 那么直接unpark是可以的,因為就這個node,propagate數也大于0
                      //如果是共享的也可以unpark,unpark后還在doAcquireShared循環的,如果發現acquires數值過大,這個線程還是會park住的
                      if (s == null || s.isShared())
                          unparkSuccessor(node);
                  }
              }

           共享的和獨占的在實現上面是類似得,共享實現上,對于獲得能成功,只要是子類實現上面能獲得成功,如信號量的實現(state的可用量是大于1的),就不用進入隊列阻塞。


          Condition
          condition是服務于單個Lock的,condition.await()等等待方法在Lock上面形成了一個condition的等待隊列,condition.singal()方法在Lock上面處理condition的等待隊列,
          一個一個化解,然后將隊列的node加入到AQS的阻塞隊列中等待對應的線程被unpark

              public final void await() throws InterruptedException {
                      
          if (Thread.interrupted())
                          
          throw new InterruptedException();
                      Node node 
          = addConditionWaiter();//關鍵點1
                      
          int savedState = fullyRelease(node);//關鍵點2
                      
          int interruptMode = 0;
                      
          while (!isOnSyncQueue(node)) {//關鍵點3
                          LockSupport.park(
          this);
                          
          if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                              
          break;
                      }
                      
          if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//關鍵點4
                          interruptMode 
          = REINTERRUPT;
                      
          if (node.nextWaiter != null)
                          unlinkCancelledWaiters();
                      
          if (interruptMode != 0)
                          reportInterruptAfterWait(interruptMode);
                  }
          其中的關鍵點:
          關鍵點1.加入到condition的對應的lock私有的隊列中,和AQS的阻塞隊列形式相似
          關鍵點2.釋放這個condition對應的lock的鎖,因為在使用的過程當中,考慮到如果這個wait()方法阻塞住,而lock如果沒有釋放鎖,那么對于其他的線程的node來說肯定是阻塞住的,
          因為condition對應的lock獲得了鎖,肯定在AQS的header處,其他線程肯定是得不到鎖阻塞在那里,這樣兩邊都阻塞的話就死鎖了,所以這里需要釋放對應lock的鎖的
          關鍵點3.判斷condition是否已經轉化成為AQS阻塞隊列的一個節點,如果沒有park線程阻塞在這里
          關鍵點4.到這一步的話就需要signal()或者signalAll()的方法的執行,說明這個線程已經被unpark,然后運行直到acquireQueued嘗試再次獲得鎖,因為condition對應的lock的鎖在關鍵
          點2是被釋放了的
              public final void signal() {
                      
          if (!isHeldExclusively())
                          
          throw new IllegalMonitorStateException();
                      Node first 
          = firstWaiter;
                      
          if (first != null)
                          doSignal(first);
                  }

              
          private void doSignal(Node first) {
                      
          do {
                          
          if ( (firstWaiter = first.nextWaiter) == null)
                              lastWaiter 
          = null;
                          first.nextWaiter 
          = null;
                      } 
          while (!transferForSignal(first) &&
                               (first 
          = firstWaiter) != null);
                  }
                  
          final boolean transferForSignal(Node node) {
                  
          /*
                   * If cannot change waitStatus, the node has been cancelled.
                   
          */
                  
          if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                      
          return false;

                  
          /*
                   * Splice onto queue and try to set waitStatus of predecessor to
                   * indicate that thread is (probably) waiting. If cancelled or
                   * attempt to set waitStatus fails, wake up to resync (in which
                   * case the waitStatus can be transiently and harmlessly wrong).
                   
          */
                  Node p 
          = enq(node);//進入到AQS的阻塞隊列中
                  int c = p.waitStatus;
                  
          if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
                      LockSupport.unpark(node.thread);
                  
          return true;
              }

              在整個AQS存在兩種鏈表。 一個鏈表就是整個Sync Node鏈表,橫向鏈表。另一種鏈表就是Condition的wait Node鏈表,相對于Sync node,它屬于node節點的一個縱向鏈表。當縱向列表被single通知后,會進入對應的Sync Node進行排隊處理。

          最后面上個圖理解下condition:


          圖片轉載自:
          http://www.goldendoc.org/2011/06/juc_condition/









          posted on 2011-09-15 22:57 nod0620 閱讀(667) 評論(0)  編輯  收藏 所屬分類: 多線程java

          <2025年6月>
          25262728293031
          1234567
          891011121314
          15161718192021
          22232425262728
          293012345

          導航

          統計

          常用鏈接

          留言簿

          隨筆分類

          隨筆檔案

          文章分類

          文章檔案

          搜索

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 信宜市| 新兴县| 城市| 嵊州市| 平潭县| 玉门市| 林口县| 叶城县| 武川县| 海伦市| 徐州市| 惠东县| 深圳市| 沾益县| 天气| 阿拉善左旗| 濮阳市| 望谟县| 彭州市| 定陶县| 鄄城县| 高邑县| 松桃| 渭南市| 五大连池市| 依兰县| 自治县| 含山县| 兰州市| 连云港市| 喜德县| 白城市| 商城县| 昂仁县| 忻州市| 通榆县| 长子县| 嫩江县| 博野县| 乾安县| 运城市|