posts - 28,  comments - 15,  trackbacks - 0

          一、前言
              ReentrantLock是JDK1.5引入的,它擁有與synchronized相同的并發(fā)性和內(nèi)存語(yǔ)義,并提供了超出synchonized的其他高級(jí)功能(例如,中斷鎖等候、條件變量等),并且使用ReentrantLock比synchronized能獲得更好的可伸縮性。

              ReentrantLock的實(shí)現(xiàn)基于AQS(AbstractQueuedSynchronizer)和LockSupport。
              AQS主要利用硬件原語(yǔ)指令(CAS compare-and-swap),來實(shí)現(xiàn)輕量級(jí)多線程同步機(jī)制,并且不會(huì)引起CPU上文切換和調(diào)度,同時(shí)提供內(nèi)存可見性和原子化更新保證(線程安全的三要素:原子性、可見性、順序性)。

              AQS的本質(zhì)上是一個(gè)同步器/阻塞鎖的基礎(chǔ)框架,其作用主要是提供加鎖、釋放鎖,并在內(nèi)部維護(hù)一個(gè)FIFO等待隊(duì)列,用于存儲(chǔ)由于鎖競(jìng)爭(zhēng)而阻塞的線程。

          二、關(guān)鍵代碼分析

          1.關(guān)鍵字段

              AQS使用鏈表作為隊(duì)列,使用volatile變量state,作為鎖狀態(tài)標(biāo)識(shí)位。

          /**
               * Head of the wait queue, lazily initialized.  Except for
               * initialization, it is modified only via method setHead.  Note:
               * If head exists, its waitStatus is guaranteed not to be
               * CANCELLED.
               
          */

              
          private transient volatile Node head;       //等待隊(duì)列的頭

              
          /**
               * Tail of the wait queue, lazily initialized.  Modified only via
               * method enq to add new wait node.
               
          */

              
          private transient volatile Node tail; //等待隊(duì)列的尾

              
          /**
               * The synchronization state.
               
          */

              
          private volatile int state;             //原子性的鎖狀態(tài)位,ReentrantLock對(duì)該字段的調(diào)用是通過原子操作compareAndSetState進(jìn)行的


             
          protected final boolean compareAndSetState(int expect, int update) {
                  
          // See below for intrinsics setup to support this
                  return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
             }




          2.ReentrantLock的公平鎖與非公平鎖


             從ReentrantLock的構(gòu)造子可以看到,ReentrantLock提供兩種鎖:公平鎖和非公平鎖,其內(nèi)部實(shí)現(xiàn)了兩種同步器NonfairSync、FairSync派生自AQS,主要才采用了模板方法模式,主要重寫了AQS的tryAcquire、lock方法,如下圖。

            

             

          public ReentrantLock() {
                   sync 
          = new NonfairSync();
           }


               
          public ReentrantLock(boolean fair) {
                   sync 
          = (fair)? new FairSync() : new NonfairSync();
                }


              

          3.獲取鎖操作

           public void lock() {
                  sync.lock();
           }

          由于NonfairSync、FairSync分別實(shí)現(xiàn)了lock方法,我們將分別探討

          3.1NonfairSync.lock()分析

           (1)通過原子的比較并設(shè)置操作,如果成功設(shè)置,說明鎖是空閑的,當(dāng)前線程獲得鎖,并把當(dāng)前線程設(shè)置為鎖擁有者;
           (2)否則,調(diào)用acquire方法;
           

          package java.util.concurrent.locks.ReentrantLock;
          final void lock() {
                      
          if (compareAndSetState(01))//表示如果當(dāng)前state=0,那么設(shè)置state=1,并返回true;否則返回false。由于未等待,所以線程不需加入到等待隊(duì)列
                          setExclusiveOwnerThread(Thread.currentThread());
                      
          else
                          acquire(
          1);
          }

           
           
          package java.util.concurrent.locks.AbstractOwnableSynchronizer  //AbstractOwnableSynchronizer是AQS的父類
           protected final void setExclusiveOwnerThread(Thread t) {
                      exclusiveOwnerThread 
          = t;
          }

           
          3.1.1acquire方法分析
           (1)如果嘗試以獨(dú)占的方式獲得鎖失敗,那么就把當(dāng)前線程封裝為一個(gè)Node,加入到等待隊(duì)列中;如果加入隊(duì)列成功,接下來檢查當(dāng)前線程的節(jié)點(diǎn)是否應(yīng)該等待(掛起),如果當(dāng)前線程所處節(jié)點(diǎn)的前一節(jié)點(diǎn)的等待狀態(tài)小于0,則通過LockSupport掛起當(dāng)前線程;無論線程是否被掛起,或者掛起后被激活,都應(yīng)該返回當(dāng)前線程的中斷狀態(tài),如果處于中斷狀態(tài),需要中斷當(dāng)前線程。
           

          package java.util.concurrent.locks.AbstractQueuedSynchronizer
          public final void acquire(int arg) {
                   
          if (!tryAcquire(arg) &&
                        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                       selfInterrupt();
          }


           
            
          protected final boolean tryAcquire(int acquires) {
                      
          return nonfairTryAcquire(acquires);
           }


           3.1.2nonfairTryAcquire分析

           (1)如果鎖狀態(tài)空閑(state=0),且通過原子的比較并設(shè)置操作,那么當(dāng)前線程獲得鎖,并把當(dāng)前線程設(shè)置為鎖擁有者;
           (2)如果鎖狀態(tài)空閑,且原子的比較并設(shè)置操作失敗,那么返回false,說明嘗試獲得鎖失敗;
           (3)否則,檢查當(dāng)前線程與鎖擁有者線程是否相等(表示一個(gè)線程已經(jīng)獲得該鎖,再次要求該鎖,這種情況叫可重入鎖),如果相等,維護(hù)鎖狀態(tài),并返回true;
           (4)如果不是以上情況,說明鎖已經(jīng)被其他的線程持有,直接返回false;
             

          final boolean nonfairTryAcquire(int acquires) {  
                      
          final Thread current = Thread.currentThread();
                      
          int c = getState();
                      
          if (c == 0{
                          
          if (compareAndSetState(0, acquires)) {
                              setExclusiveOwnerThread(current);
                              
          return true;
                          }

                      }

                      
          else if (current == getExclusiveOwnerThread()) {  //表示一個(gè)線程已經(jīng)獲得該鎖,再次要求該鎖(重入鎖的由來),為狀態(tài)位加acquires
                          int nextc = c + acquires;
                          
          if (nextc < 0// overflow
                              throw new Error("Maximum lock count exceeded");
                          setState(nextc);
                          
          return true;
                      }

                      
          return false;
           }

          3.1.3addWaiter分析
           (1)如果tail節(jié)點(diǎn)不為null,說明隊(duì)列不為空,則把新節(jié)點(diǎn)加入到tail的后面,返回當(dāng)前節(jié)點(diǎn),否則進(jìn)入enq進(jìn)行處理(2);
           (2)如果tail節(jié)點(diǎn)為null,說明隊(duì)列為空,需要建立一個(gè)虛擬的頭節(jié)點(diǎn),并把封裝了當(dāng)前線程的節(jié)點(diǎn)設(shè)置為尾節(jié)點(diǎn);另外一種情況的發(fā)生,是由于在(1)中的compareAndSetTail可能會(huì)出現(xiàn)失敗,這里采用for的無限循環(huán),是要保證當(dāng)前線程能夠正確進(jìn)入等待隊(duì)列;
           

          package java.util.concurrent.locks.AbstractQueuedSynchronizer
             
          private Node addWaiter(Node mode) {
                   Node node 
          = new Node(Thread.currentThread(), mode);
                   
          // Try the fast path of enq; backup to full enq on failure
                   Node pred = tail;
                   
          if (pred != null{  //如果當(dāng)前隊(duì)列不是空隊(duì)列,則把新節(jié)點(diǎn)加入到tail的后面,返回當(dāng)前節(jié)點(diǎn),否則進(jìn)入enq進(jìn)行處理。
                        node.prev = pred;
                        
          if (compareAndSetTail(pred, node)) {
                            pred.next 
          = node;
                            
          return node;
                        }

                   }

                   enq(node);
                   
          return node;
               }


           
          package java.util.concurrent.locks.AbstractQueuedSynchronizer
           
          private Node enq(final Node node) {
                  
          for (;;) {
                      Node t 
          = tail;
                      
          if (t == null// tail節(jié)點(diǎn)為空,說明是空隊(duì)列,初始化頭節(jié)點(diǎn),如果成功,返回頭節(jié)點(diǎn)
                          Node h = new Node(); // Dummy header
                          h.next = node;
                          node.prev 
          = h;
                          
          if (compareAndSetHead(h)) {
                              tail 
          = node;
                              
          return h;
                          }

                      }

                      
          else {   //
                          node.prev = t;
                          
          if (compareAndSetTail(t, node)) {
                              t.next 
          = node;
                              
          return t;
                          }

                      }

                  }


          3.1.4acquire分析
          (1)如果當(dāng)前節(jié)點(diǎn)是隊(duì)列的頭結(jié)點(diǎn)(如果第一個(gè)節(jié)點(diǎn)是虛擬節(jié)點(diǎn),那么第二個(gè)節(jié)點(diǎn)實(shí)際上就是頭結(jié)點(diǎn)了),就嘗試在此獲取鎖tryAcquire(arg)。如果成功就將頭結(jié)點(diǎn)設(shè)置為當(dāng)前節(jié)點(diǎn)(不管第一個(gè)結(jié)點(diǎn)是否是虛擬節(jié)點(diǎn)),返回中斷狀態(tài)。否則進(jìn)行(2)。 
          (2)檢測(cè)當(dāng)前節(jié)點(diǎn)是否應(yīng)該park()-"掛起的意思",如果應(yīng)該park()就掛起當(dāng)前線程并且返回當(dāng)前線程中斷狀態(tài)。進(jìn)行操作(1)。
            

            final boolean acquireQueued(final Node node, int arg) {
                  
          try {
                      
          boolean interrupted = false;
                      
          for (;;) {
                          
          final Node p = node.predecessor();
                          
          if (p == head && tryAcquire(arg)) {
                              setHead(node);
                              p.next 
          = null// help GC
                              return interrupted;
                          }

                          
          if (shouldParkAfterFailedAcquire(p, node) &&
                              parkAndCheckInterrupt())
                              interrupted 
          = true;
                      }

                  }
           catch (RuntimeException ex) {
                      cancelAcquire(node);
                      
          throw ex;
                  }

               }


          3.1.5 shouldParkAfterFailedAcquire分析

           (1)如果前一個(gè)節(jié)點(diǎn)的等待狀態(tài)waitStatus<0,也就是前面的節(jié)點(diǎn)還沒有獲得到鎖,那么返回true,表示當(dāng)前節(jié)點(diǎn)(線程)就應(yīng)該park()了。否則進(jìn)行(2)。 
           (2)如果前一個(gè)節(jié)點(diǎn)的等待狀態(tài)waitStatus>0,也就是前一個(gè)節(jié)點(diǎn)被CANCELLED了,那么就將前一個(gè)節(jié)點(diǎn)去掉,遞歸此操作直到所有前一個(gè)節(jié)點(diǎn)的waitStatus<=0,進(jìn)行(4)。否則進(jìn)行(3)。 
           (3)前一個(gè)節(jié)點(diǎn)等待狀態(tài)waitStatus=0,修改前一個(gè)節(jié)點(diǎn)狀態(tài)位為SINGAL,表示后面有節(jié)點(diǎn)等待你處理,需要根據(jù)它的等待狀態(tài)來決定是否該park()。進(jìn)行(4)。 
           (4)返回false,表示線程不應(yīng)該park()。

           注意:一個(gè)Node節(jié)點(diǎn)可包含以下狀態(tài)以及模式:
                /** waitStatus value to indicate thread has cancelled */    取消
                  static final int CANCELLED =  1;
                  /** waitStatus value to indicate successor's thread needs unparking */   信號(hào)等待(在AQS中,是通過LockSupport進(jìn)行線程間信號(hào)交互的)
                  static final int SIGNAL    = -1;
                  /** waitStatus value to indicate thread is waiting on condition */       條件等待   
                  static final int CONDITION = -2;
                  /** Marker to indicate a node is waiting in shared mode */  共享模式
                  static final Node SHARED = new Node();
                  /** Marker to indicate a node is waiting in exclusive mode */           獨(dú)占模式
                  static final Node EXCLUSIVE = null;

           

            private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
                  
          int s = pred.waitStatus;
                  
          if (s < 0)
                      
          /*
                       * This node has already set status asking a release
                       * to signal it, so it can safely park
                       
          */

                      
          return true;
                  
          if (s > 0{
                      
          /*
                       * Predecessor was cancelled. Skip over predecessors and
                       * indicate retry.
                       
          */

               
          do {
            node.prev 
          = pred = pred.prev;
               }
           while (pred.waitStatus > 0);
               pred.next 
          = node;
           }

                  
          else
                      
          /*
                       * Indicate that we need a signal, but don't park yet. Caller
                       * will need to retry to make sure it cannot acquire before
                       * parking.
                       
          */

                      compareAndSetWaitStatus(pred, 
          0, Node.SIGNAL);
                  
          return false;
              }


           
          private final boolean parkAndCheckInterrupt() {
                  LockSupport.park(
          this);  //阻塞,即掛起;在沒有unpark之前,下面的代碼將不會(huì)執(zhí)行;
                  return Thread.interrupted();//個(gè)人感覺,如果沒有外部的interrupt或者超時(shí)等,這里將始終返回false;
              }


           
          private static void selfInterrupt() {
                  Thread.currentThread().interrupt();
              }


          3.2FairSync.lock()分析

           公平鎖相對(duì)與非公平鎖,在鎖的獲取實(shí)現(xiàn)上,差別只在FairSync提供自己的tryAcquire()的方法實(shí)現(xiàn),代碼如下:

           (1)如果鎖狀態(tài)為0,等待隊(duì)列為空,或者給定的線程在隊(duì)列的頭部,那么該線程獲得鎖;
           (2)如果當(dāng)前線程與鎖持有者線程相等,這種情況屬于鎖重入,鎖狀態(tài)加上請(qǐng)求數(shù);
           (3)以上兩種情況都不是,返回false,說明嘗試獲得鎖失敗;

          protected final boolean tryAcquire(int acquires) {
                      
          final Thread current = Thread.currentThread();
                      
          int c = getState();
                      
          if (c == 0{
                          
          if (isFirst(current) &&
                              compareAndSetState(
          0, acquires)) {
                              setExclusiveOwnerThread(current);
                              
          return true;
                          }

                      }

                      
          else if (current == getExclusiveOwnerThread()) {
                          
          int nextc = c + acquires;
                          
          if (nextc < 0)
                              
          throw new Error("Maximum lock count exceeded");
                          setState(nextc);
                          
          return true;
                      }

                      
          return false;
                  }



              
          final boolean isFirst(Thread current) {
                  Node h, s;
                  
          return ((h = head) == null ||
                          ((s 
          = h.next) != null && s.thread == current) ||
                          fullIsFirst(current)); 
          //頭為null,頭的下個(gè)節(jié)點(diǎn)不是空且該節(jié)點(diǎn)的線程與當(dāng)前線程是相等的,
              }


              
          final boolean fullIsFirst(Thread current) {
                  
          // same idea as fullGetFirstQueuedThread
                  Node h, s;
                  Thread firstThread 
          = null;//如果頭不為空,且頭的下個(gè)節(jié)點(diǎn)也不為空,且該節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn)是頭節(jié)點(diǎn),且該節(jié)點(diǎn)的線程不為null
                  if (((h = head) != null && (s = h.next) != null &&
                       s.prev 
          == head && (firstThread = s.thread) != null))
                      
          return firstThread == current;
                  Node t 
          = tail;
                  
          while (t != null && t != head) {
                      Thread tt 
          = t.thread;
                      
          if (tt != null)
                          firstThread 
          = tt;
                      t 
          = t.prev;
                  }

                  
          return firstThread == current || firstThread == null;
              }


           


          4.總結(jié)

               ReentrantLock在采用非公平鎖構(gòu)造時(shí),首先檢查鎖狀態(tài),如果鎖可用,直接通過CAS設(shè)置成持有狀態(tài),且把當(dāng)前線程設(shè)置為鎖的擁有者。
          如果當(dāng)前鎖已經(jīng)被持有,那么接下來進(jìn)行可重入檢查,如果可重入,需要為鎖狀態(tài)加上請(qǐng)求數(shù)。如果不屬于上面兩種情況,那么說明鎖是被其他線程持有,
          當(dāng)前線程應(yīng)該放入等待隊(duì)列。
               在放入等待隊(duì)列的過程中,首先要檢查隊(duì)列是否為空隊(duì)列,如果為空隊(duì)列,需要?jiǎng)?chuàng)建虛擬的頭節(jié)點(diǎn),然后把對(duì)當(dāng)前線程封裝的節(jié)點(diǎn)加入到隊(duì)列尾部。由于設(shè)置尾部節(jié)點(diǎn)采用了CAS,為了保證尾節(jié)點(diǎn)能夠設(shè)置成功,這里采用了無限循環(huán)的方式,直到設(shè)置成功為止。
               在完成放入等待隊(duì)列任務(wù)后,則需要維護(hù)節(jié)點(diǎn)的狀態(tài),以及及時(shí)清除處于Cancel狀態(tài)的節(jié)點(diǎn),以幫助垃圾收集器及時(shí)回收。如果當(dāng)前節(jié)點(diǎn)之前的節(jié)點(diǎn)的等待狀態(tài)小于1,說明當(dāng)前節(jié)點(diǎn)之前的線程處于等待狀態(tài)(掛起),那么當(dāng)前節(jié)點(diǎn)的線程也應(yīng)處于等待狀態(tài)(掛起)。掛起的工作是由LockSupport類支持的,LockSupport通過JNI調(diào)用本地操作系統(tǒng)來完成掛起的任務(wù)(java中除了廢棄的suspend等方法,沒有其他的掛起操作)。
              在當(dāng)前等待的線程,被喚起后,檢查中斷狀態(tài),如果處于中斷狀態(tài),那么需要中斷當(dāng)前線程。

              下一節(jié)《ReentrantLock源碼之一lock方法解析(鎖的釋放)》

          posted on 2011-08-18 14:12 zhangxl 閱讀(19706) 評(píng)論(1)  編輯  收藏 所屬分類: java 并發(fā)


          FeedBack:
          # 嗯嗯
          2016-03-12 00:31 | 安德森
          阿斯達(dá)斯  回復(fù)  更多評(píng)論
            
          <2016年3月>
          282912345
          6789101112
          13141516171819
          20212223242526
          272829303112
          3456789

          常用鏈接

          留言簿(1)

          隨筆分類(17)

          隨筆檔案(28)

          文章分類(30)

          文章檔案(30)

          相冊(cè)

          收藏夾(2)

          hibernate

          java基礎(chǔ)

          mysql

          xml

          關(guān)注

          壓力測(cè)試

          算法

          最新隨筆

          搜索

          •  

          積分與排名

          • 積分 - 96750
          • 排名 - 600

          最新評(píng)論

          閱讀排行榜

          評(píng)論排行榜

          主站蜘蛛池模板: 荥经县| 门头沟区| 唐海县| 宜兰市| 凤凰县| 安图县| 自治县| 安仁县| 交城县| 江口县| 海兴县| 汾阳市| 鄂州市| 桃园县| 页游| 类乌齐县| 如东县| 宝山区| 广宁县| 凤阳县| 抚松县| 博爱县| 高州市| 桐庐县| 泰来县| 怀仁县| 来安县| 金门县| 当雄县| 彭泽县| 阿巴嘎旗| 湖南省| 北宁市| 越西县| 德化县| 齐河县| 祁连县| 珲春市| 门源| 南丰县| 揭阳市|