posts - 28,  comments - 15,  trackbacks - 0

          一、前言
              ReentrantLock是JDK1.5引入的,它擁有與synchronized相同的并發性和內存語義,并提供了超出synchonized的其他高級功能(例如,中斷鎖等候、條件變量等),并且使用ReentrantLock比synchronized能獲得更好的可伸縮性。

              ReentrantLock的實現基于AQS(AbstractQueuedSynchronizer)和LockSupport。
              AQS主要利用硬件原語指令(CAS compare-and-swap),來實現輕量級多線程同步機制,并且不會引起CPU上文切換和調度,同時提供內存可見性和原子化更新保證(線程安全的三要素:原子性、可見性、順序性)。

              AQS的本質上是一個同步器/阻塞鎖的基礎框架,其作用主要是提供加鎖、釋放鎖,并在內部維護一個FIFO等待隊列,用于存儲由于鎖競爭而阻塞的線程。

          二、關鍵代碼分析

          1.關鍵字段

              AQS使用鏈表作為隊列,使用volatile變量state,作為鎖狀態標識位。

          /**
               * Head of the wait queue, lazily initialized.  Except for
               * initialization, it is modified only via method setHead.  Note:
               * If head exists, its waitStatus is guaranteed not to be
               * CANCELLED.
               
          */

              
          private transient volatile Node head;       //等待隊列的頭

              
          /**
               * Tail of the wait queue, lazily initialized.  Modified only via
               * method enq to add new wait node.
               
          */

              
          private transient volatile Node tail; //等待隊列的尾

              
          /**
               * The synchronization state.
               
          */

              
          private volatile int state;             //原子性的鎖狀態位,ReentrantLock對該字段的調用是通過原子操作compareAndSetState進行的


             
          protected final boolean compareAndSetState(int expect, int update) {
                  
          // See below for intrinsics setup to support this
                  return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
             }




          2.ReentrantLock的公平鎖與非公平鎖


             從ReentrantLock的構造子可以看到,ReentrantLock提供兩種鎖:公平鎖和非公平鎖,其內部實現了兩種同步器NonfairSync、FairSync派生自AQS,主要才采用了模板方法模式,主要重寫了AQS的tryAcquire、lock方法,如下圖。

            

             

          public ReentrantLock() {
                   sync 
          = new NonfairSync();
           }


               
          public ReentrantLock(boolean fair) {
                   sync 
          = (fair)? new FairSync() : new NonfairSync();
                }


              

          3.獲取鎖操作

           public void lock() {
                  sync.lock();
           }

          由于NonfairSync、FairSync分別實現了lock方法,我們將分別探討

          3.1NonfairSync.lock()分析

           (1)通過原子的比較并設置操作,如果成功設置,說明鎖是空閑的,當前線程獲得鎖,并把當前線程設置為鎖擁有者;
           (2)否則,調用acquire方法;
           

          package java.util.concurrent.locks.ReentrantLock;
          final void lock() {
                      
          if (compareAndSetState(01))//表示如果當前state=0,那么設置state=1,并返回true;否則返回false。由于未等待,所以線程不需加入到等待隊列
                          setExclusiveOwnerThread(Thread.currentThread());
                      
          else
                          acquire(
          1);
          }

           
           
          package java.util.concurrent.locks.AbstractOwnableSynchronizer  //AbstractOwnableSynchronizer是AQS的父類
           protected final void setExclusiveOwnerThread(Thread t) {
                      exclusiveOwnerThread 
          = t;
          }

           
          3.1.1acquire方法分析
           (1)如果嘗試以獨占的方式獲得鎖失敗,那么就把當前線程封裝為一個Node,加入到等待隊列中;如果加入隊列成功,接下來檢查當前線程的節點是否應該等待(掛起),如果當前線程所處節點的前一節點的等待狀態小于0,則通過LockSupport掛起當前線程;無論線程是否被掛起,或者掛起后被激活,都應該返回當前線程的中斷狀態,如果處于中斷狀態,需要中斷當前線程。
           

          package java.util.concurrent.locks.AbstractQueuedSynchronizer
          public final void acquire(int arg) {
                   
          if (!tryAcquire(arg) &&
                        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                       selfInterrupt();
          }


           
            
          protected final boolean tryAcquire(int acquires) {
                      
          return nonfairTryAcquire(acquires);
           }


           3.1.2nonfairTryAcquire分析

           (1)如果鎖狀態空閑(state=0),且通過原子的比較并設置操作,那么當前線程獲得鎖,并把當前線程設置為鎖擁有者;
           (2)如果鎖狀態空閑,且原子的比較并設置操作失敗,那么返回false,說明嘗試獲得鎖失敗;
           (3)否則,檢查當前線程與鎖擁有者線程是否相等(表示一個線程已經獲得該鎖,再次要求該鎖,這種情況叫可重入鎖),如果相等,維護鎖狀態,并返回true;
           (4)如果不是以上情況,說明鎖已經被其他的線程持有,直接返回false;
             

          final boolean nonfairTryAcquire(int acquires) {  
                      
          final Thread current = Thread.currentThread();
                      
          int c = getState();
                      
          if (c == 0{
                          
          if (compareAndSetState(0, acquires)) {
                              setExclusiveOwnerThread(current);
                              
          return true;
                          }

                      }

                      
          else if (current == getExclusiveOwnerThread()) {  //表示一個線程已經獲得該鎖,再次要求該鎖(重入鎖的由來),為狀態位加acquires
                          int nextc = c + acquires;
                          
          if (nextc < 0// overflow
                              throw new Error("Maximum lock count exceeded");
                          setState(nextc);
                          
          return true;
                      }

                      
          return false;
           }

          3.1.3addWaiter分析
           (1)如果tail節點不為null,說明隊列不為空,則把新節點加入到tail的后面,返回當前節點,否則進入enq進行處理(2);
           (2)如果tail節點為null,說明隊列為空,需要建立一個虛擬的頭節點,并把封裝了當前線程的節點設置為尾節點;另外一種情況的發生,是由于在(1)中的compareAndSetTail可能會出現失敗,這里采用for的無限循環,是要保證當前線程能夠正確進入等待隊列;
           

          package java.util.concurrent.locks.AbstractQueuedSynchronizer
             
          private Node addWaiter(Node mode) {
                   Node node 
          = new Node(Thread.currentThread(), mode);
                   
          // Try the fast path of enq; backup to full enq on failure
                   Node pred = tail;
                   
          if (pred != null{  //如果當前隊列不是空隊列,則把新節點加入到tail的后面,返回當前節點,否則進入enq進行處理。
                        node.prev = pred;
                        
          if (compareAndSetTail(pred, node)) {
                            pred.next 
          = node;
                            
          return node;
                        }

                   }

                   enq(node);
                   
          return node;
               }


           
          package java.util.concurrent.locks.AbstractQueuedSynchronizer
           
          private Node enq(final Node node) {
                  
          for (;;) {
                      Node t 
          = tail;
                      
          if (t == null// tail節點為空,說明是空隊列,初始化頭節點,如果成功,返回頭節點
                          Node h = new Node(); // Dummy header
                          h.next = node;
                          node.prev 
          = h;
                          
          if (compareAndSetHead(h)) {
                              tail 
          = node;
                              
          return h;
                          }

                      }

                      
          else {   //
                          node.prev = t;
                          
          if (compareAndSetTail(t, node)) {
                              t.next 
          = node;
                              
          return t;
                          }

                      }

                  }


          3.1.4acquire分析
          (1)如果當前節點是隊列的頭結點(如果第一個節點是虛擬節點,那么第二個節點實際上就是頭結點了),就嘗試在此獲取鎖tryAcquire(arg)。如果成功就將頭結點設置為當前節點(不管第一個結點是否是虛擬節點),返回中斷狀態。否則進行(2)。 
          (2)檢測當前節點是否應該park()-"掛起的意思",如果應該park()就掛起當前線程并且返回當前線程中斷狀態。進行操作(1)。
            

            final boolean acquireQueued(final Node node, int arg) {
                  
          try {
                      
          boolean interrupted = false;
                      
          for (;;) {
                          
          final Node p = node.predecessor();
                          
          if (p == head && tryAcquire(arg)) {
                              setHead(node);
                              p.next 
          = null// help GC
                              return interrupted;
                          }

                          
          if (shouldParkAfterFailedAcquire(p, node) &&
                              parkAndCheckInterrupt())
                              interrupted 
          = true;
                      }

                  }
           catch (RuntimeException ex) {
                      cancelAcquire(node);
                      
          throw ex;
                  }

               }


          3.1.5 shouldParkAfterFailedAcquire分析

           (1)如果前一個節點的等待狀態waitStatus<0,也就是前面的節點還沒有獲得到鎖,那么返回true,表示當前節點(線程)就應該park()了。否則進行(2)。 
           (2)如果前一個節點的等待狀態waitStatus>0,也就是前一個節點被CANCELLED了,那么就將前一個節點去掉,遞歸此操作直到所有前一個節點的waitStatus<=0,進行(4)。否則進行(3)。 
           (3)前一個節點等待狀態waitStatus=0,修改前一個節點狀態位為SINGAL,表示后面有節點等待你處理,需要根據它的等待狀態來決定是否該park()。進行(4)。 
           (4)返回false,表示線程不應該park()。

           注意:一個Node節點可包含以下狀態以及模式:
                /** waitStatus value to indicate thread has cancelled */    取消
                  static final int CANCELLED =  1;
                  /** waitStatus value to indicate successor's thread needs unparking */   信號等待(在AQS中,是通過LockSupport進行線程間信號交互的)
                  static final int SIGNAL    = -1;
                  /** waitStatus value to indicate thread is waiting on condition */       條件等待   
                  static final int CONDITION = -2;
                  /** Marker to indicate a node is waiting in shared mode */  共享模式
                  static final Node SHARED = new Node();
                  /** Marker to indicate a node is waiting in exclusive mode */           獨占模式
                  static final Node EXCLUSIVE = null;

           

            private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
                  
          int s = pred.waitStatus;
                  
          if (s < 0)
                      
          /*
                       * This node has already set status asking a release
                       * to signal it, so it can safely park
                       
          */

                      
          return true;
                  
          if (s > 0{
                      
          /*
                       * Predecessor was cancelled. Skip over predecessors and
                       * indicate retry.
                       
          */

               
          do {
            node.prev 
          = pred = pred.prev;
               }
           while (pred.waitStatus > 0);
               pred.next 
          = node;
           }

                  
          else
                      
          /*
                       * Indicate that we need a signal, but don't park yet. Caller
                       * will need to retry to make sure it cannot acquire before
                       * parking.
                       
          */

                      compareAndSetWaitStatus(pred, 
          0, Node.SIGNAL);
                  
          return false;
              }


           
          private final boolean parkAndCheckInterrupt() {
                  LockSupport.park(
          this);  //阻塞,即掛起;在沒有unpark之前,下面的代碼將不會執行;
                  return Thread.interrupted();//個人感覺,如果沒有外部的interrupt或者超時等,這里將始終返回false;
              }


           
          private static void selfInterrupt() {
                  Thread.currentThread().interrupt();
              }


          3.2FairSync.lock()分析

           公平鎖相對與非公平鎖,在鎖的獲取實現上,差別只在FairSync提供自己的tryAcquire()的方法實現,代碼如下:

           (1)如果鎖狀態為0,等待隊列為空,或者給定的線程在隊列的頭部,那么該線程獲得鎖;
           (2)如果當前線程與鎖持有者線程相等,這種情況屬于鎖重入,鎖狀態加上請求數;
           (3)以上兩種情況都不是,返回false,說明嘗試獲得鎖失敗;

          protected final boolean tryAcquire(int acquires) {
                      
          final Thread current = Thread.currentThread();
                      
          int c = getState();
                      
          if (c == 0{
                          
          if (isFirst(current) &&
                              compareAndSetState(
          0, acquires)) {
                              setExclusiveOwnerThread(current);
                              
          return true;
                          }

                      }

                      
          else if (current == getExclusiveOwnerThread()) {
                          
          int nextc = c + acquires;
                          
          if (nextc < 0)
                              
          throw new Error("Maximum lock count exceeded");
                          setState(nextc);
                          
          return true;
                      }

                      
          return false;
                  }



              
          final boolean isFirst(Thread current) {
                  Node h, s;
                  
          return ((h = head) == null ||
                          ((s 
          = h.next) != null && s.thread == current) ||
                          fullIsFirst(current)); 
          //頭為null,頭的下個節點不是空且該節點的線程與當前線程是相等的,
              }


              
          final boolean fullIsFirst(Thread current) {
                  
          // same idea as fullGetFirstQueuedThread
                  Node h, s;
                  Thread firstThread 
          = null;//如果頭不為空,且頭的下個節點也不為空,且該節點的上一個節點是頭節點,且該節點的線程不為null
                  if (((h = head) != null && (s = h.next) != null &&
                       s.prev 
          == head && (firstThread = s.thread) != null))
                      
          return firstThread == current;
                  Node t 
          = tail;
                  
          while (t != null && t != head) {
                      Thread tt 
          = t.thread;
                      
          if (tt != null)
                          firstThread 
          = tt;
                      t 
          = t.prev;
                  }

                  
          return firstThread == current || firstThread == null;
              }


           


          4.總結

               ReentrantLock在采用非公平鎖構造時,首先檢查鎖狀態,如果鎖可用,直接通過CAS設置成持有狀態,且把當前線程設置為鎖的擁有者。
          如果當前鎖已經被持有,那么接下來進行可重入檢查,如果可重入,需要為鎖狀態加上請求數。如果不屬于上面兩種情況,那么說明鎖是被其他線程持有,
          當前線程應該放入等待隊列。
               在放入等待隊列的過程中,首先要檢查隊列是否為空隊列,如果為空隊列,需要創建虛擬的頭節點,然后把對當前線程封裝的節點加入到隊列尾部。由于設置尾部節點采用了CAS,為了保證尾節點能夠設置成功,這里采用了無限循環的方式,直到設置成功為止。
               在完成放入等待隊列任務后,則需要維護節點的狀態,以及及時清除處于Cancel狀態的節點,以幫助垃圾收集器及時回收。如果當前節點之前的節點的等待狀態小于1,說明當前節點之前的線程處于等待狀態(掛起),那么當前節點的線程也應處于等待狀態(掛起)。掛起的工作是由LockSupport類支持的,LockSupport通過JNI調用本地操作系統來完成掛起的任務(java中除了廢棄的suspend等方法,沒有其他的掛起操作)。
              在當前等待的線程,被喚起后,檢查中斷狀態,如果處于中斷狀態,那么需要中斷當前線程。

              下一節《ReentrantLock源碼之一lock方法解析(鎖的釋放)》

          posted on 2011-08-18 14:12 zhangxl 閱讀(19693) 評論(1)  編輯  收藏 所屬分類: java 并發


          FeedBack:
          # 嗯嗯
          2016-03-12 00:31 | 安德森
          阿斯達斯  回復  更多評論
            
          <2016年3月>
          282912345
          6789101112
          13141516171819
          20212223242526
          272829303112
          3456789

          常用鏈接

          留言簿(1)

          隨筆分類(17)

          隨筆檔案(28)

          文章分類(30)

          文章檔案(30)

          相冊

          收藏夾(2)

          hibernate

          java基礎

          mysql

          xml

          關注

          壓力測試

          算法

          最新隨筆

          搜索

          •  

          積分與排名

          • 積分 - 96393
          • 排名 - 601

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 临城县| 郎溪县| 永春县| 方正县| 旺苍县| 南溪县| 区。| 陵川县| 龙南县| 广宁县| 兴宁市| 武隆县| 蚌埠市| 洛南县| 龙里县| 南通市| 日土县| 武平县| 丹寨县| 龙山县| 阿鲁科尔沁旗| 岫岩| 天门市| 项城市| 泗阳县| 南阳市| 屏边| 奈曼旗| 太湖县| 井冈山市| 深水埗区| 克东县| 延津县| 团风县| 舞阳县| 宜兴市| 苍南县| 庄浪县| 临武县| 泊头市| 沙田区|