whitesock

          BlogJava 首頁(yè) 新隨筆 聯(lián)系 聚合 管理
            10 Posts :: 0 Stories :: 0 Comments :: 0 Trackbacks

          Inside AbstractQueuedSynchronizer (1)

          Inside AbstractQueuedSynchronizer (2)

          Inside AbstractQueuedSynchronizer (3)

          Inside AbstractQueuedSynchronizer (4)

          3.6 ConditionObject

          AbstractQueuedSynchronizer的內(nèi)部類ConditionObject實(shí)現(xiàn)了Condition接口。Condition接口提供了跟Java語(yǔ)言內(nèi)置的monitor機(jī)制類似的接口:await()/signal()/signalAll(),以及一些支持超時(shí)和回退的await版本。可以將任意個(gè)數(shù)的ConcitionObject關(guān)聯(lián)到對(duì)應(yīng)的synchronizer,例如通過(guò)調(diào)用ReentrantLock.newCondition()方法即可構(gòu)造一個(gè)ConditionObject實(shí)例。每個(gè)ConditionObject實(shí)例內(nèi)部都維護(hù)一個(gè)ConditionQueue,該隊(duì)列的元素跟AbstractQueuedSynchronizer的WaitQueue一樣,都是Node對(duì)象。

          ConditionObject的await()代碼如下:

          public final void await() throws InterruptedException {
              if (Thread.interrupted())
                  throw new InterruptedException();
              Node node = addConditionWaiter();
              int savedState = fullyRelease(node);
              int interruptMode = 0;
              while (!isOnSyncQueue(node)) {
                  LockSupport.park(this);
                  if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                      break;
              }
              if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                  interruptMode = REINTERRUPT;
              if (node.nextWaiter != null) // clean up if cancelled
                  unlinkCancelledWaiters();
              if (interruptMode != 0)
                  reportInterruptAfterWait(interruptMode);
          }

          以上代碼中,可以看出ConditionObject的await語(yǔ)義跟Java語(yǔ)言內(nèi)置的monitor機(jī)制是非常相似的(詳見:http://whitesock.iteye.com/blog/162344 )。首先addConditionWaiter()將當(dāng)前線程加入到ConditionQueue中,然后fullyRelease(node)釋放掉跟ConditionObject關(guān)聯(lián)的synchronizer鎖。如果某個(gè)線程在沒(méi)有持有對(duì)應(yīng)的synchronizer鎖的情況下調(diào)用某個(gè)ConditionObject對(duì)象的await()方法,那么跟Object.wait()一樣會(huì)拋出IllegalMonitorStateException。接下來(lái)while (!isOnSyncQueue(node)) {...}會(huì)保證在其它線程調(diào)用了該ConditionObject的signal()/siangalAll()之前,當(dāng)前線程一直被阻塞(signal()/siangalAll()的行為稍后會(huì)介紹)。在被signal()/siangalAll()喚醒之后,await()通過(guò)acquireQueued(node, savedState)確保再次獲得synchronizer的鎖。

          ConditionObject的signal()代碼如下:

          public final void signal() {
              if (!isHeldExclusively())
                  throw new IllegalMonitorStateException();
              Node first = firstWaiter;
              if (first != null)
                  doSignal(first);
          }
          
          private void doSignal(Node first) {
              do {
                  if ( (firstWaiter = first.nextWaiter) == null)
                      lastWaiter = null;
                  first.nextWaiter = null;
              } while (!transferForSignal(first) &&
                       (first = firstWaiter) != null);
          }

          那么跟await()一樣,如果某個(gè)線程在沒(méi)有持有對(duì)應(yīng)的synchronizer鎖的情況下調(diào)用某個(gè)ConditionObject對(duì)象的signal()/siangalAll()方法,會(huì)拋出IllegalMonitorStateException。signal()主要的行為就是將ConditionQueue中對(duì)應(yīng)的Node實(shí)例transfer到AbstractQueuedSynchronizer的WaitQueue中,以便在synchronizer release的過(guò)程中,該Node對(duì)應(yīng)的線程可能被喚醒。

          3.7 Timeout & Cancellation

          AbstractQueuedSynchronizer的acquireQueued()和doAcquire***()系列方法在acquire失敗(超時(shí)或者中斷)后,都會(huì)調(diào)用cancelAcquire(Node node)方法進(jìn)行清理,其代碼如下:

          private void cancelAcquire(Node node) {
              // Ignore if node doesn't exist
              if (node == null)
                  return;
          
              node.thread = null;
          
              // Skip cancelled predecessors
              Node pred = node.prev;
              while (pred.waitStatus > 0)
                  node.prev = pred = pred.prev;
          
              // predNext is the apparent node to unsplice. CASes below will
              // fail if not, in which case, we lost race vs another cancel
              // or signal, so no further action is necessary.
              Node predNext = pred.next;
          
              // Can use unconditional write instead of CAS here.
              // After this atomic step, other Nodes can skip past us.
              // Before, we are free of interference from other threads.
              node.waitStatus = Node.CANCELLED;
          
              // If we are the tail, remove ourselves.
              if (node == tail && compareAndSetTail(node, pred)) {
                  compareAndSetNext(pred, predNext, null);
              } else {
                  // If successor needs signal, try to set pred's next-link
                  // so it will get one. Otherwise wake it up to propagate.
                  int ws;
                  if (pred != head &&
                      ((ws = pred.waitStatus) == Node.SIGNAL ||
                       (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                      pred.thread != null) {
                      Node next = node.next;
                      if (next != null && next.waitStatus <= 0)
                          compareAndSetNext(pred, predNext, next);
                  } else {
                      unparkSuccessor(node);
                  }
          
                  node.next = node; // help GC
              }
          }

          需要注意的是, cancelAcquire(Node node)方法是可能會(huì)被并發(fā)調(diào)用。while (pred.waitStatus > 0) {...}這段循環(huán)的作用就是清除當(dāng)前Node之前的已經(jīng)被標(biāo)記為取消的節(jié)點(diǎn),但是head節(jié)點(diǎn)除外(因?yàn)閔ead節(jié)點(diǎn)保證不會(huì)被標(biāo)記為Node.CANCELLED)。這段循環(huán)初看起來(lái)有并發(fā)問(wèn)題,但是推敲一下之后發(fā)現(xiàn):循環(huán)過(guò)程中函數(shù)參數(shù)node的waitStatus不會(huì)大于0,因此即使是多個(gè)線程并發(fā)執(zhí)行這個(gè)循環(huán),那么這些線程處理的都只是鏈表中互不重疊的一部分。接下來(lái)在node.waitStatus = Node.CANCELLED執(zhí)行完畢之后,后續(xù)的操作都必須要避免并發(fā)問(wèn)題。

          關(guān)于處理線程中斷, ConditionObject的await()/signal()/signalAll()等方法符合JSR 133: Java Memory Model and Thread Specification Revision中規(guī)定的語(yǔ)義:如果中斷在signal之前發(fā)生,那么await必須在重新獲得synchronizer的鎖之后,拋出InterruptedException;如果中斷發(fā)生在signal之后發(fā)生,那么await必須要設(shè)定當(dāng)前線程的中斷狀態(tài),并且不能拋出InterruptedException。

          4 Reference

          The java.util.concurrent Synchronizer Framework

          The Art of Multiprocessor Programming

          posted on 2012-01-08 17:07 whitesock 閱讀(269) 評(píng)論(0)  編輯  收藏

          只有注冊(cè)用戶登錄后才能發(fā)表評(píng)論。


          網(wǎng)站導(dǎo)航:
           
          主站蜘蛛池模板: 平武县| 湄潭县| 晋中市| 亚东县| 新龙县| 麟游县| 霞浦县| 临武县| 夹江县| 日土县| 鹤山市| 镇雄县| 微博| 祁连县| 延庆县| 长宁县| 商水县| 吉林省| 洛南县| 中牟县| 福安市| 石柱| 隆回县| 常熟市| 澄江县| 二连浩特市| 罗定市| 敦化市| 健康| 锡林郭勒盟| 陆丰市| 安多县| 伽师县| 祁东县| 花莲县| 汽车| 东乌| 白朗县| 富宁县| 错那县| 浦北县|