Inside AbstractQueuedSynchronizer (1)
Inside AbstractQueuedSynchronizer (2)
3.6 ConditionObject
AbstractQueuedSynchronizer的內部類ConditionObject實現了Condition接口。Condition接口提供了跟Java語言內置的monitor機制類似的接口:await()/signal()/signalAll(),以及一些支持超時和回退的await版本。可以將任意個數的ConcitionObject關聯到對應的synchronizer,例如通過調用ReentrantLock.newCondition()方法即可構造一個ConditionObject實例。每個ConditionObject實例內部都維護一個ConditionQueue,該隊列的元素跟AbstractQueuedSynchronizer的WaitQueue一樣,都是Node對象。
ConditionObject的await()代碼如下:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
以上代碼中,可以看出ConditionObject的await語義跟Java語言內置的monitor機制是非常相似的(詳見:http://whitesock.iteye.com/blog/162344 )。首先addConditionWaiter()將當前線程加入到ConditionQueue中,然后fullyRelease(node)釋放掉跟ConditionObject關聯的synchronizer鎖。如果某個線程在沒有持有對應的synchronizer鎖的情況下調用某個ConditionObject對象的await()方法,那么跟Object.wait()一樣會拋出IllegalMonitorStateException。接下來while (!isOnSyncQueue(node)) {...}會保證在其它線程調用了該ConditionObject的signal()/siangalAll()之前,當前線程一直被阻塞(signal()/siangalAll()的行為稍后會介紹)。在被signal()/siangalAll()喚醒之后,await()通過acquireQueued(node, savedState)確保再次獲得synchronizer的鎖。
ConditionObject的signal()代碼如下:
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
那么跟await()一樣,如果某個線程在沒有持有對應的synchronizer鎖的情況下調用某個ConditionObject對象的signal()/siangalAll()方法,會拋出IllegalMonitorStateException。signal()主要的行為就是將ConditionQueue中對應的Node實例transfer到AbstractQueuedSynchronizer的WaitQueue中,以便在synchronizer release的過程中,該Node對應的線程可能被喚醒。
3.7 Timeout & Cancellation
AbstractQueuedSynchronizer的acquireQueued()和doAcquire***()系列方法在acquire失?。ǔ瑫r或者中斷)后,都會調用cancelAcquire(Node node)方法進行清理,其代碼如下:
private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } }
需要注意的是, cancelAcquire(Node node)方法是可能會被并發調用。while (pred.waitStatus > 0) {...}這段循環的作用就是清除當前Node之前的已經被標記為取消的節點,但是head節點除外(因為head節點保證不會被標記為Node.CANCELLED)。這段循環初看起來有并發問題,但是推敲一下之后發現:循環過程中函數參數node的waitStatus不會大于0,因此即使是多個線程并發執行這個循環,那么這些線程處理的都只是鏈表中互不重疊的一部分。接下來在node.waitStatus = Node.CANCELLED執行完畢之后,后續的操作都必須要避免并發問題。
關于處理線程中斷, ConditionObject的await()/signal()/signalAll()等方法符合JSR 133: Java Memory Model and Thread Specification Revision中規定的語義:如果中斷在signal之前發生,那么await必須在重新獲得synchronizer的鎖之后,拋出InterruptedException;如果中斷發生在signal之后發生,那么await必須要設定當前線程的中斷狀態,并且不能拋出InterruptedException。
4 Reference
The java.util.concurrent Synchronizer Framework
The Art of Multiprocessor Programming