xylz,imxylz

          關(guān)注后端架構(gòu)、中間件、分布式和并發(fā)編程

             :: 首頁(yè) :: 新隨筆 :: 聯(lián)系 :: 聚合  :: 管理 ::
            111 隨筆 :: 10 文章 :: 2680 評(píng)論 :: 0 Trackbacks

          接上篇,這篇從Lock.lock/unlock開始。特別說(shuō)明在沒有特殊情況下所有程序、API、文檔都是基于JDK 6.0的。

          public void java.util.concurrent.locks.ReentrantLock.lock()

          獲取鎖。

          如果該鎖沒有被另一個(gè)線程保持,則獲取該鎖并立即返回,將鎖的保持計(jì)數(shù)設(shè)置為 1。

          如果當(dāng)前線程已經(jīng)保持該鎖,則將保持計(jì)數(shù)加 1,并且該方法立即返回。

          如果該鎖被另一個(gè)線程保持,則出于線程調(diào)度的目的,禁用當(dāng)前線程,并且在獲得鎖之前,該線程將一直處于休眠狀態(tài),此時(shí)鎖保持計(jì)數(shù)被設(shè)置為 1。

          從上面的文檔可以看出ReentrantLock是可重入鎖的實(shí)現(xiàn)。而內(nèi)部是委托java.util.concurrent.locks.ReentrantLock.Sync.lock()實(shí)現(xiàn)的。java.util.concurrent.locks.ReentrantLock.Sync是抽象類,有java.util.concurrent.locks.ReentrantLock.FairSync和java.util.concurrent.locks.ReentrantLock.NonfairSync兩個(gè)實(shí)現(xiàn),也就是常說(shuō)的公平鎖和不公平鎖。

          公平鎖和非公平鎖

          如果獲取一個(gè)鎖是按照請(qǐng)求的順序得到的,那么就是公平鎖,否則就是非公平鎖。

          在沒有深入了解內(nèi)部機(jī)制及實(shí)現(xiàn)之前,先了解下為什么會(huì)存在公平鎖和非公平鎖。公平鎖保證一個(gè)阻塞的線程最終能夠獲得鎖,因?yàn)槭怯行虻模钥偸强梢园凑照?qǐng)求的順序獲得鎖。不公平鎖意味著后請(qǐng)求鎖的線程可能在其前面排列的休眠線程恢復(fù)前拿到鎖,這樣就有可能提高并發(fā)的性能。這是因?yàn)橥ǔG闆r下掛起的線程重新開始與它真正開始運(yùn)行,二者之間會(huì)產(chǎn)生嚴(yán)重的延時(shí)。因此非公平鎖就可以利用這段時(shí)間完成操作。這是非公平鎖在某些時(shí)候比公平鎖性能要好的原因之一。

          二者在實(shí)現(xiàn)上的區(qū)別會(huì)在后面介紹,我們先從公平鎖(FairSync)開始。

          前面說(shuō)過(guò)java.util.concurrent.locks.AbstractQueuedSynchronizer (AQS)是Lock的基礎(chǔ),對(duì)于一個(gè)FairSync而言,lock()就直接調(diào)用AQS的acquire(int arg);

          public final void acquire(int arg) 以獨(dú)占模式獲取對(duì)象,忽略中斷。通過(guò)至少調(diào)用一次 tryAcquire(int) 來(lái)實(shí)現(xiàn)此方法,并在成功時(shí)返回。否則在成功之前,一直調(diào)用 tryAcquire(int) 將線程加入隊(duì)列,線程可能重復(fù)被阻塞或不被阻塞。

          在介紹實(shí)現(xiàn)之前先要補(bǔ)充上一節(jié)的知識(shí),對(duì)于一個(gè)AQS的實(shí)現(xiàn)而言,通常情況下需要實(shí)現(xiàn)以下方法來(lái)描述如何鎖定線程。

        1. tryAcquire(int) 試圖在獨(dú)占模式下獲取對(duì)象狀態(tài)。此方法應(yīng)該查詢是否允許它在獨(dú)占模式下獲取對(duì)象狀態(tài),如果允許,則獲取它。

          此方法總是由執(zhí)行 acquire 的線程來(lái)調(diào)用。如果此方法報(bào)告失敗,則 acquire 方法可以將線程加入隊(duì)列(如果還沒有將它加入隊(duì)列),直到獲得其他某個(gè)線程釋放了該線程的信號(hào)。也就是說(shuō)此方法是一種嘗試性方法,如果成功獲取鎖那最好,如果沒有成功也沒有關(guān)系,直接返回false。

        2. tryRelease(int) 試圖設(shè)置狀態(tài)來(lái)反映獨(dú)占模式下的一個(gè)釋放。 此方法總是由正在執(zhí)行釋放的線程調(diào)用。釋放鎖可能失敗或者拋出異常,這個(gè)在后面會(huì)具體分析。
        3. tryAcquireShared(int) 試圖在共享模式下獲取對(duì)象狀態(tài)。
        4. tryReleaseShared(int) 試圖設(shè)置狀態(tài)來(lái)反映共享模式下的一個(gè)釋放。
        5. isHeldExclusively() 如果對(duì)于當(dāng)前(正調(diào)用的)線程,同步是以獨(dú)占方式進(jìn)行的,則返回 true
        6. 除了tryAcquire(int)外,其它方法會(huì)在后面具體介紹。首先對(duì)于ReentrantLock而言,不管是公平鎖還是非公平鎖,都是獨(dú)占鎖,也就是說(shuō)同時(shí)能夠有一個(gè)線程持有鎖。因此對(duì)于acquire(int arg)而言,arg==1。在AQS中acquire的實(shí)現(xiàn)如下:

          public final void acquire(int arg) {
              if (!tryAcquire(arg) &&
                  acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                  selfInterrupt();
          }

          這個(gè)看起來(lái)比較復(fù)雜,我們分解以下4個(gè)步驟。

            1. 如果tryAcquire(arg)成功,那就沒有問題,已經(jīng)拿到鎖,整個(gè)lock()過(guò)程就結(jié)束了。如果失敗進(jìn)行操作2。
            2. 創(chuàng)建一個(gè)獨(dú)占節(jié)點(diǎn)(Node)并且此節(jié)點(diǎn)加入CHL隊(duì)列末尾。進(jìn)行操作3。
            3. 自旋嘗試獲取鎖,失敗根據(jù)前一個(gè)節(jié)點(diǎn)來(lái)決定是否掛起(park()),直到成功獲取到鎖。進(jìn)行操作4。
            4. 如果當(dāng)前線程已經(jīng)中斷過(guò),那么就中斷當(dāng)前線程(清除中斷位)。

          這是一個(gè)比較復(fù)雜的過(guò)程,我們按部就班一個(gè)一個(gè)分析。

          tryAcquire(acquires)

          對(duì)于公平鎖而言,它的實(shí)現(xiàn)方式如下:

              protected final boolean tryAcquire(int acquires) {
                  final Thread current = Thread.currentThread();
                  int c = getState();
                  if (c == 0) {
                      if (isFirst(current) &&
                          compareAndSetState(0, acquires)) {
                          setExclusiveOwnerThread(current);
                          return true;
                      }
                  }
                  else if (current == getExclusiveOwnerThread()) {
                      int nextc = c + acquires;
                      if (nextc < 0)
                          throw new Error("Maximum lock count exceeded");
                      setState(nextc);
                      return true;
                  }
                  return false;
              }
          }

          在這段代碼中,前面說(shuō)明對(duì)于AQS存在一個(gè)state來(lái)描述當(dāng)前有多少線程持有鎖。由于AQS支持共享鎖(例如讀寫鎖,后面會(huì)繼續(xù)講),所以這里state>=0,但是由于ReentrantLock是獨(dú)占鎖,所以這里不妨理解為0<=state,acquires=1。isFirst(current)是一個(gè)很復(fù)雜的邏輯,包括踢出無(wú)用的節(jié)點(diǎn)等復(fù)雜過(guò)程,這里暫且不提,大體上的意思是說(shuō)判斷AQS是否為空或者當(dāng)前線程是否在隊(duì)列頭(為了區(qū)分公平與非公平鎖)。

            1. 如果當(dāng)前鎖有其它線程持有,c!=0,進(jìn)行操作2。否則,如果當(dāng)前線程在AQS隊(duì)列頭部,則嘗試將AQS狀態(tài)state設(shè)為acquires(等于1),成功后將AQS獨(dú)占線程設(shè)為當(dāng)前線程返回true,否則進(jìn)行2。這里可以看到compareAndSetState就是使用了CAS操作。
            2. 判斷當(dāng)前線程與AQS的獨(dú)占線程是否相同,如果相同,那么就將當(dāng)前狀態(tài)位加1(這里+1后結(jié)果為負(fù)數(shù)后面會(huì)講,這里暫且不理它),修改狀態(tài)位,返回true,否則進(jìn)行3。這里之所以不是將當(dāng)前狀態(tài)位設(shè)置為1,而是修改為舊值+1呢?這是因?yàn)镽eentrantLock是可重入鎖,同一個(gè)線程每持有一次就+1。
            3. 返回false。

          比較非公平鎖的tryAcquire實(shí)現(xiàn)java.util.concurrent.locks.ReentrantLock.Sync.nonfairTryAcquire(int),公平鎖多了一個(gè)判斷當(dāng)前節(jié)點(diǎn)是否在隊(duì)列頭,這個(gè)就保證了是否按照請(qǐng)求鎖的順序來(lái)決定獲取鎖的順序(同一個(gè)線程的多次獲取鎖除外)。

          現(xiàn)在再回頭看公平鎖和非公平鎖的lock()方法。公平鎖只有一句acquire(1);而非公平鎖的調(diào)用如下:

          final void lock() {
              if (compareAndSetState(0, 1))
                  setExclusiveOwnerThread(Thread.currentThread());
              else
                  acquire(1);
          }

          很顯然,非公平鎖在第一次獲取鎖,或者其它線程釋放鎖后(可能等待),優(yōu)先采用compareAndSetState(0,1)然后設(shè)置AQS獨(dú)占線程而持有鎖,這樣有時(shí)候比acquire(1)順序檢查鎖持有而要高效。即使在重入鎖上,也就是compareAndSetState(0,1)失敗,但是是當(dāng)前線程持有鎖上,非公平鎖也沒有問題。

          addWaiter(mode)

          tryAcquire失敗就意味著入隊(duì)列了。此時(shí)AQS的隊(duì)列中節(jié)點(diǎn)Node就開始發(fā)揮作用了。一般情況下AQS支持獨(dú)占鎖和共享鎖,而獨(dú)占鎖在Node中就意味著條件(Condition)隊(duì)列為空(上一篇中介紹過(guò)相關(guān)概念)。在java.util.concurrent.locks.AbstractQueuedSynchronizer.Node中有兩個(gè)常量,

          static final Node EXCLUSIVE = null; //獨(dú)占節(jié)點(diǎn)模式

          static final Node SHARED = new Node(); //共享節(jié)點(diǎn)模式

          addWaiter(mode)中的mode就是節(jié)點(diǎn)模式,也就是共享鎖還是獨(dú)占鎖模式。

          前面一再?gòu)?qiáng)調(diào)ReentrantLock是獨(dú)占鎖模式。

          private Node addWaiter(Node mode) {
               Node node = new Node(Thread.currentThread(), mode);
               // Try the fast path of enq; backup to full enq on failure
               Node pred = tail;
               if (pred != null) {
                   node.prev = pred;
                   if (compareAndSetTail(pred, node)) {
                       pred.next = node;
                       return node;
                   }
               }
               enq(node);
               return node;
          }

          上面是節(jié)點(diǎn)如隊(duì)列的一部分。當(dāng)前僅當(dāng)隊(duì)列不為空并且將新節(jié)點(diǎn)插入尾部成功后直接返回新節(jié)點(diǎn)。否則進(jìn)入enq(Node)進(jìn)行操作。

          private Node enq(final Node node) {
              for (;;) {
                  Node t = tail;
                  if (t == null) { // Must initialize
                      Node h = new Node(); // Dummy header
                      h.next = node;
                      node.prev = h;
                      if (compareAndSetHead(h)) {
                          tail = node;
                          return h;
                      }
                  }
                  else {
                      node.prev = t;
                      if (compareAndSetTail(t, node)) {
                          t.next = node;
                          return t;
                      }
                  }
              }
          }

          enq(Node)去隊(duì)列操作實(shí)現(xiàn)了CHL隊(duì)列的算法,如果為空就創(chuàng)建頭結(jié)點(diǎn),然后同時(shí)比較節(jié)點(diǎn)尾部是否是改變來(lái)決定CAS操作是否成功,當(dāng)且僅當(dāng)成功后才將為不節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)指向?yàn)樾鹿?jié)點(diǎn)。可以看到這里仍然是CAS操作。

          acquireQueued(node,arg)

          自旋請(qǐng)求鎖,如果可能的話掛起線程,直到得到鎖,返回當(dāng)前線程是否中斷過(guò)(如果park()過(guò)并且中斷過(guò)的話有一個(gè)interrupted中斷位)。

          final boolean acquireQueued(final Node node, int arg) {
              try {
                  boolean interrupted = false;
                  for (;;) {
                      final Node p = node.predecessor();
                      if (p == head && tryAcquire(arg)) {
                          setHead(node);
                          p.next = null; // help GC
                          return interrupted;
                      }
                      if (shouldParkAfterFailedAcquire(p, node) &&
                          parkAndCheckInterrupt())
                          interrupted = true;
                  }
              } catch (RuntimeException ex) {
                  cancelAcquire(node);
                  throw ex;
              }
          }

          下面的分析就需要用到上節(jié)節(jié)點(diǎn)的狀態(tài)描述了。acquireQueued過(guò)程是這樣的:

            1. 如果當(dāng)前節(jié)點(diǎn)是AQS隊(duì)列的頭結(jié)點(diǎn)(如果第一個(gè)節(jié)點(diǎn)是DUMP節(jié)點(diǎn)也就是傀儡節(jié)點(diǎn),那么第二個(gè)節(jié)點(diǎn)實(shí)際上就是頭結(jié)點(diǎn)了),就嘗試在此獲取鎖tryAcquire(arg)。如果成功就將頭結(jié)點(diǎn)設(shè)置為當(dāng)前節(jié)點(diǎn)(不管第一個(gè)結(jié)點(diǎn)是否是DUMP節(jié)點(diǎn)),返回中斷位。否則進(jìn)行2。
            2. 檢測(cè)當(dāng)前節(jié)點(diǎn)是否應(yīng)該park(),如果應(yīng)該park()就掛起當(dāng)前線程并且返回當(dāng)前線程中斷位。進(jìn)行操作1。

          一個(gè)節(jié)點(diǎn)是否該park()是關(guān)鍵,這是由方法java.util.concurrent.locks.AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire(Node, Node)實(shí)現(xiàn)的。

          private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
              int s = pred.waitStatus;
              if (s < 0) return true;
              if (s > 0) {
                  do {
                      node.prev = pred = pred.prev;
                  } while (pred.waitStatus > 0);
                  pred.next = node;
              } else compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
              return false;
          }

            1. 如果前一個(gè)節(jié)點(diǎn)的等待狀態(tài)waitStatus<0,也就是前面的節(jié)點(diǎn)還沒有獲得到鎖,那么返回true,表示當(dāng)前節(jié)點(diǎn)(線程)就應(yīng)該park()了。否則進(jìn)行2。
            2. 如果前一個(gè)節(jié)點(diǎn)的等待狀態(tài)waitStatus>0,也就是前一個(gè)節(jié)點(diǎn)被CANCELLED了,那么就將前一個(gè)節(jié)點(diǎn)去掉,遞歸此操作直到所有前一個(gè)節(jié)點(diǎn)的waitStatus<=0,進(jìn)行4。否則進(jìn)行3。
            3. 前一個(gè)節(jié)點(diǎn)等待狀態(tài)waitStatus=0,修改前一個(gè)節(jié)點(diǎn)狀態(tài)位為SINGAL,表示后面有節(jié)點(diǎn)等待你處理,需要根據(jù)它的等待狀態(tài)來(lái)決定是否該park()。進(jìn)行4。
            4. 返回false,表示線程不應(yīng)該park()。

          selfInterrupt()

          private static void selfInterrupt() {
              Thread.currentThread().interrupt();
          }

          如果線程曾經(jīng)中斷過(guò)(或者阻塞過(guò))(比如手動(dòng)interrupt()或者超時(shí)等等,那么就再中斷一次,中斷兩次的意思就是清除中斷位)。

          大體上整個(gè)Lock.lock()就這樣一個(gè)流程。除了lock()方法外,還有l(wèi)ockInterruptibly()/tryLock()/unlock()/newCondition()等,在接下來(lái)的章節(jié)中會(huì)一一介紹。

           



          ©2009-2014 IMXYLZ |求賢若渴
          posted on 2010-07-07 00:05 imxylz 閱讀(40182) 評(píng)論(6)  編輯  收藏 所屬分類: J2EE

          評(píng)論

          # re: 深入淺出 Java Concurrency (8): 鎖機(jī)制 part 3 2010-12-01 18:03 楊羅羅
          有點(diǎn)大方向,不過(guò)還是看的云里霧里....  回復(fù)  更多評(píng)論
            

          # re: 深入淺出 Java Concurrency (8): 鎖機(jī)制 part 3 2012-02-22 23:16 吳波
          if (s > 0) {
          do {
          node.prev = pred = pred.prev;
          } while (pred.waitStatus > 0);
          pred.next = node;
          } else compareAndSetWaitStatus(pred, 0, Node.SIGNAL);為什么s==0時(shí)還要把前一個(gè)結(jié)點(diǎn)設(shè)置為SIGNAL?  回復(fù)  更多評(píng)論
            

          # re: 深入淺出 Java Concurrency (8): 鎖機(jī)制 part 3 2012-02-23 09:50 xylz
          @吳波
          參考上一節(jié)的節(jié)點(diǎn)狀態(tài) http://www.aygfsteel.com/xylz/archive/2010/07/06/325390.html

          節(jié)點(diǎn)狀態(tài)為0表示節(jié)點(diǎn)是新生的節(jié)點(diǎn)。對(duì)于新生的節(jié)點(diǎn)而言,如果執(zhí)行完畢(出隊(duì)列)了,是不會(huì)喚醒后繼節(jié)點(diǎn)的。

          所以這里有必要將前一個(gè)新生節(jié)點(diǎn)的狀態(tài)位修改為SIGNAL,表示一旦前一個(gè)節(jié)點(diǎn)出隊(duì)列了,立即嘗試喚醒后繼節(jié)點(diǎn)。  回復(fù)  更多評(píng)論
            

          # re: 深入淺出 Java Concurrency (8): 鎖機(jī)制 part 3 2012-04-10 13:50 吳波
          JDK文檔ReentrantLock類中 tryLock(long timeout,TimeUnit unit)方法有這么一段注釋:如果為了使用公平的排序策略,已經(jīng)設(shè)置此鎖,并且其他線程都在等待該鎖,則不會(huì)獲取一個(gè)可用的鎖。這與 tryLock() 方法相反。如果想使用一個(gè)允許闖入公平鎖的定時(shí) tryLock,那么可以將定時(shí)形式和不定時(shí)形式組合在一起:
          if (lock.tryLock() || lock.tryLock(timeout, unit) ) { ... }
          ---------------------------
          lock.tryLock()是不公平的,為什么將這兩個(gè)組合就實(shí)現(xiàn)了公平鎖的定時(shí)tryLock?
            回復(fù)  更多評(píng)論
            

          # re: 深入淺出 Java Concurrency (8): 鎖機(jī)制 part 3 2012-04-10 14:27 imxylz
          @吳波

          這段話的意思是說(shuō),tryLock(long timeout,TimeUnit unit)是定時(shí)獲取公平鎖,不會(huì)被"闖入"從而破壞公平性(指進(jìn)入隊(duì)列的順序),而tryLock()卻是非公平獲取鎖方式,這回破壞公平性。
          --------------
          如果要想實(shí)現(xiàn)一個(gè)允許"闖入"破壞公平性的定時(shí)tryLock(),換句話說(shuō)既想使用“闖入”提高性能,同時(shí)又想有超時(shí)特性(定時(shí)),那么可以使用下面的組合:
          ================
          if (lock.tryLock() || lock.tryLock(timeout, unit) ) { ... }
          ================


          補(bǔ)充代碼:
          ================
          public boolean tryLock() {
          return sync.nonfairTryAcquire(1);
          }
          --------------
          public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
          return sync.tryAcquireNanos(1, unit.toNanos(timeout));
          }  回復(fù)  更多評(píng)論
            

          # re: 深入淺出 Java Concurrency (8): 鎖機(jī)制 part 3 2012-05-07 21:00 吳波
          請(qǐng)教一下aqs那篇論文中有一句話的翻譯:Programmers construct
          synchronizers only when needed, so there is no need to compact
          space that would otherwise be wasted。貌似在說(shuō)synchronized會(huì)浪費(fèi)空間,為什么呢?還有synchronized在多處理器上和單處理器上運(yùn)行有啥區(qū)別嗎?  回復(fù)  更多評(píng)論
            


          ©2009-2014 IMXYLZ
          主站蜘蛛池模板: 高台县| 本溪市| 万宁市| 儋州市| 南华县| 辰溪县| 长岛县| 新竹市| 湖南省| 白玉县| 曲周县| 黄大仙区| 凤山市| 恭城| 特克斯县| 灵寿县| 临潭县| 神木县| 凤翔县| 荆门市| 富阳市| 铁力市| 浪卡子县| 汕头市| 五大连池市| 正宁县| 平阴县| 潼关县| 嘉祥县| 鄂伦春自治旗| 彰化县| 阳江市| 班玛县| 四子王旗| 屯留县| 庄河市| 天长市| 长兴县| 庆安县| 光泽县| 嵊州市|