Inside AbstractQueuedSynchronizer (1)
Inside AbstractQueuedSynchronizer (2)
3 AbstractQueuedSynchronizer
3.1 Inheritance
AbstractQueuedSynchronizer繼承自AbstractOwnableSynchronizer。AbstractOwnableSynchronizer繼承自Object并且實現了Serializable接口,它只有一個成員變量private transient Thread exclusiveOwnerThread以及對應的getter/setter方法。該成員變量用于保存當前擁有排他訪問權的線程。需要注意的是,該成員變量沒有用volatile關鍵字修飾。
3.2 State
AbstractQueuedSynchronizer用一個int(private volatile int state)來保存同步狀態,以及對應的getter/setter/compareAndSetState方法。Java6新增了一個AbstractQueuedLongSynchronizer,它用一個long來保存同步狀態,貌似目前沒有被java.util.concurrent中的其它synchronizer所使用。對于ReentrantLock,state為0則意味著鎖沒有被任何線程持有;否則state保存了持有鎖的線程的重入次數。
3.3 WaitQueue
WaitQueue是AbstractQueuedSynchronizer的核心,它用于保存被阻塞的線程。它的實現是"CLH" (Craig, Landin, and Hagersten) lock queue的一個變種。
標準的CLH lock queue通常被用來實現spin lock,它通過TheadLocal變量pred引用隊列中的前一個節點(Node本身沒有指向前后節點的引用),以下是標準的CLH lock queue的一個參考實現:
public class ClhSpinLock { private final ThreadLocal<Node> pred; private final ThreadLocal<Node> node; private final AtomicReference<Node> tail = new AtomicReference<Node>(new Node()); public ClhSpinLock() { this.node = new ThreadLocal<Node>() { protected Node initialValue() { return new Node(); } }; this.pred = new ThreadLocal<Node>() { protected Node initialValue() { return null; } }; } public void lock() { final Node node = this.node.get(); node.locked = true; Node pred = this.tail.getAndSet(node); this.pred.set(pred); while (pred.locked) {} } public void unlock() { final Node node = this.node.get(); node.locked = false; this.node.set(this.pred.get()); } private static class Node { private volatile boolean locked; } }
其邏輯并不復雜:對于lock操作,只需要通過一個CAS操作即可將當前線程對應的節點加入到隊列中,并且同時獲得了predecessor節點的引用,然后就是等待predecessor釋放鎖;對于unlock操作,只需要將當前線程對應節點的locked成員變量設置為false。unlock方法中的this.node.set(this.pred.get())主要目的是重用predecessor上的Node對象,這是對GC友好的一個優化。如果不考慮這個優化,那么this.node.set(new Node())也是可以的。跟那些TAS(test and set) spin lock和TTAS(test test and set) spin lock相比,CLH spin lock主要是解決了cache-coherence traffic的問題:每個線程在busy loop的時候,并沒有競爭同一個狀態,而是只判斷其對應predecessor的鎖定狀態。如果你擔心false sharing問題,那么可以考慮將鎖定狀態padding到cache line的長度。此外,CLH spin lock通過FIFO的隊列保證了鎖競爭的公平性。
AbstractQueuedSynchronizer的靜態內部類Node維護了一個FIFO的等待隊列。跟CLH不同的是,Node中包含了指向predecessor和sucessor的引用。predecessor引用的作用是為了支持鎖等待超時(timeout)和鎖等待回退(cancellation)的功能。sucessor的作用是為了支持線程阻塞:在Inside AbstractQueuedSynchronizer (1) 中提到過,AbstractQueuedSynchronizer通過LockSupport實現了線程的block/unblock,因此需要通過successor引用找到后續的線程并將其喚醒(CLH spin lock因為是spin,所以不需要顯式地喚醒)。此外,Node中還包括一個volatile int waitStatus成員變量用于控制線程的阻塞/喚醒,以及避免不必要的調用LockSupport的park/unpark方法。需要注意的是,雖然AbstractQueuedSynchronizer在絕大多數情況下是通過LockSupport進行線程的阻塞/喚醒,但是在特定情況下也會使用spin lock,static final long spinForTimeoutThreshold = 1000L這個靜態變量設定了使用spin lock的一個閾值。
對WaitQueue進行enqueue操作相關的代碼如下:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
addWaiter方法中的那個if分支,其實是一種優化,如果失敗那么會調用enq方法進行enqueue。需要注意的是,以上代碼中對next引用的設定是在enqueue成功之后進行的。這樣做雖然沒有并發問題,但是在判斷一個node是否有sucessor時,不能僅僅通過next == null來判斷,因為enqueue和設置next引用這兩個步驟不是一個原子操作。
對WaitQueue進行dequeue操作相關的代碼如下:
private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
setHead方法沒有用到鎖,也沒有使用CAS,這樣沒有并發問題?沒有,因為這個方法只會被持有鎖的線程所調用,此時只需要將head指向持有鎖的線程對應的node即可。