接上篇,這篇從Lock.lock/unlock開始。特別說明在沒有特殊情況下所有程序、API、文檔都是基于JDK 6.0的。
public void java.util.concurrent.locks.ReentrantLock.lock()
獲取鎖。
如果該鎖沒有被另一個線程保持,則獲取該鎖并立即返回,將鎖的保持計數設置為 1。
如果當前線程已經保持該鎖,則將保持計數加 1,并且該方法立即返回。
如果該鎖被另一個線程保持,則出于線程調度的目的,禁用當前線程,并且在獲得鎖之前,該線程將一直處于休眠狀態,此時鎖保持計數被設置為 1。
從上面的文檔可以看出ReentrantLock是可重入鎖的實現。而內部是委托java.util.concurrent.locks.ReentrantLock.Sync.lock()實現的。java.util.concurrent.locks.ReentrantLock.Sync是抽象類,有java.util.concurrent.locks.ReentrantLock.FairSync和java.util.concurrent.locks.ReentrantLock.NonfairSync兩個實現,也就是常說的公平鎖和不公平鎖。
公平鎖和非公平鎖
如果獲取一個鎖是按照請求的順序得到的,那么就是公平鎖,否則就是非公平鎖。
在沒有深入了解內部機制及實現之前,先了解下為什么會存在公平鎖和非公平鎖。公平鎖保證一個阻塞的線程最終能夠獲得鎖,因為是有序的,所以總是可以按照請求的順序獲得鎖。不公平鎖意味著后請求鎖的線程可能在其前面排列的休眠線程恢復前拿到鎖,這樣就有可能提高并發的性能。這是因為通常情況下掛起的線程重新開始與它真正開始運行,二者之間會產生嚴重的延時。因此非公平鎖就可以利用這段時間完成操作。這是非公平鎖在某些時候比公平鎖性能要好的原因之一。
二者在實現上的區別會在后面介紹,我們先從公平鎖(FairSync)開始。
前面說過java.util.concurrent.locks.AbstractQueuedSynchronizer (AQS)是Lock的基礎,對于一個FairSync而言,lock()就直接調用AQS的acquire(int arg);
public final void acquire(int arg) 以獨占模式獲取對象,忽略中斷。通過至少調用一次
tryAcquire(int)
來實現此方法,并在成功時返回。否則在成功之前,一直調用tryAcquire(int)
將線程加入隊列,線程可能重復被阻塞或不被阻塞。
在介紹實現之前先要補充上一節的知識,對于一個AQS的實現而言,通常情況下需要實現以下方法來描述如何鎖定線程。
tryAcquire(int)
試圖在獨占模式下獲取對象狀態。此方法應該查詢是否允許它在獨占模式下獲取對象狀態,如果允許,則獲取它。此方法總是由執行 acquire 的線程來調用。如果此方法報告失敗,則 acquire 方法可以將線程加入隊列(如果還沒有將它加入隊列),直到獲得其他某個線程釋放了該線程的信號。也就是說此方法是一種嘗試性方法,如果成功獲取鎖那最好,如果沒有成功也沒有關系,直接返回false。
tryRelease(int)
試圖設置狀態來反映獨占模式下的一個釋放。 此方法總是由正在執行釋放的線程調用。釋放鎖可能失敗或者拋出異常,這個在后面會具體分析。tryAcquireShared(int) 試圖在共享模式下獲取對象狀態。
tryReleaseShared(int) 試圖設置狀態來反映共享模式下的一個釋放。
isHeldExclusively() 如果對于當前(正調用的)線程,同步是以獨占方式進行的,則返回
true
。
除了tryAcquire(int)外,其它方法會在后面具體介紹。首先對于ReentrantLock而言,不管是公平鎖還是非公平鎖,都是獨占鎖,也就是說同時能夠有一個線程持有鎖。因此對于acquire(int arg)而言,arg==1。在AQS中acquire的實現如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
這個看起來比較復雜,我們分解以下4個步驟。
- 如果tryAcquire(arg)成功,那就沒有問題,已經拿到鎖,整個lock()過程就結束了。如果失敗進行操作2。
- 創建一個獨占節點(Node)并且此節點加入CHL隊列末尾。進行操作3。
- 自旋嘗試獲取鎖,失敗根據前一個節點來決定是否掛起(park()),直到成功獲取到鎖。進行操作4。
- 如果當前線程已經中斷過,那么就中斷當前線程(清除中斷位)。
這是一個比較復雜的過程,我們按部就班一個一個分析。
tryAcquire(acquires)
對于公平鎖而言,它的實現方式如下:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (isFirst(current) &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
在這段代碼中,前面說明對于AQS存在一個state來描述當前有多少線程持有鎖。由于AQS支持共享鎖(例如讀寫鎖,后面會繼續講),所以這里state>=0,但是由于ReentrantLock是獨占鎖,所以這里不妨理解為0<=state,acquires=1。isFirst(current)是一個很復雜的邏輯,包括踢出無用的節點等復雜過程,這里暫且不提,大體上的意思是說判斷AQS是否為空或者當前線程是否在隊列頭(為了區分公平與非公平鎖)。
- 如果當前鎖有其它線程持有,c!=0,進行操作2。否則,如果當前線程在AQS隊列頭部,則嘗試將AQS狀態state設為acquires(等于1),成功后將AQS獨占線程設為當前線程返回true,否則進行2。這里可以看到compareAndSetState就是使用了CAS操作。
- 判斷當前線程與AQS的獨占線程是否相同,如果相同,那么就將當前狀態位加1(這里+1后結果為負數后面會講,這里暫且不理它),修改狀態位,返回true,否則進行3。這里之所以不是將當前狀態位設置為1,而是修改為舊值+1呢?這是因為ReentrantLock是可重入鎖,同一個線程每持有一次就+1。
- 返回false。
比較非公平鎖的tryAcquire實現java.util.concurrent.locks.ReentrantLock.Sync.nonfairTryAcquire(int),公平鎖多了一個判斷當前節點是否在隊列頭,這個就保證了是否按照請求鎖的順序來決定獲取鎖的順序(同一個線程的多次獲取鎖除外)。
現在再回頭看公平鎖和非公平鎖的lock()方法。公平鎖只有一句acquire(1);而非公平鎖的調用如下:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
很顯然,非公平鎖在第一次獲取鎖,或者其它線程釋放鎖后(可能等待),優先采用compareAndSetState(0,1)然后設置AQS獨占線程而持有鎖,這樣有時候比acquire(1)順序檢查鎖持有而要高效。即使在重入鎖上,也就是compareAndSetState(0,1)失敗,但是是當前線程持有鎖上,非公平鎖也沒有問題。
addWaiter(mode)
tryAcquire失敗就意味著入隊列了。此時AQS的隊列中節點Node就開始發揮作用了。一般情況下AQS支持獨占鎖和共享鎖,而獨占鎖在Node中就意味著條件(Condition)隊列為空(上一篇中介紹過相關概念)。在java.util.concurrent.locks.AbstractQueuedSynchronizer.Node中有兩個常量,
static final Node EXCLUSIVE = null; //獨占節點模式
static final Node SHARED = new Node(); //共享節點模式
addWaiter(mode)中的mode就是節點模式,也就是共享鎖還是獨占鎖模式。
前面一再強調ReentrantLock是獨占鎖模式。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
上面是節點如隊列的一部分。當前僅當隊列不為空并且將新節點插入尾部成功后直接返回新節點。否則進入enq(Node)進行操作。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
Node h = new Node(); // Dummy header
h.next = node;
node.prev = h;
if (compareAndSetHead(h)) {
tail = node;
return h;
}
}
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq(Node)去隊列操作實現了CHL隊列的算法,如果為空就創建頭結點,然后同時比較節點尾部是否是改變來決定CAS操作是否成功,當且僅當成功后才將為不節點的下一個節點指向為新節點。可以看到這里仍然是CAS操作。
acquireQueued(node,arg)
自旋請求鎖,如果可能的話掛起線程,直到得到鎖,返回當前線程是否中斷過(如果park()過并且中斷過的話有一個interrupted中斷位)。
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
}
下面的分析就需要用到上節節點的狀態描述了。acquireQueued過程是這樣的:
- 如果當前節點是AQS隊列的頭結點(如果第一個節點是DUMP節點也就是傀儡節點,那么第二個節點實際上就是頭結點了),就嘗試在此獲取鎖tryAcquire(arg)。如果成功就將頭結點設置為當前節點(不管第一個結點是否是DUMP節點),返回中斷位。否則進行2。
- 檢測當前節點是否應該park(),如果應該park()就掛起當前線程并且返回當前線程中斷位。進行操作1。
一個節點是否該park()是關鍵,這是由方法java.util.concurrent.locks.AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire(Node, Node)實現的。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int s = pred.waitStatus;
if (s < 0) return true;
if (s > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
return false;
}
- 如果前一個節點的等待狀態waitStatus<0,也就是前面的節點還沒有獲得到鎖,那么返回true,表示當前節點(線程)就應該park()了。否則進行2。
- 如果前一個節點的等待狀態waitStatus>0,也就是前一個節點被CANCELLED了,那么就將前一個節點去掉,遞歸此操作直到所有前一個節點的waitStatus<=0,進行4。否則進行3。
- 前一個節點等待狀態waitStatus=0,修改前一個節點狀態位為SINGAL,表示后面有節點等待你處理,需要根據它的等待狀態來決定是否該park()。進行4。
- 返回false,表示線程不應該park()。
selfInterrupt()
private static void selfInterrupt() {
Thread.currentThread().interrupt();
}
如果線程曾經中斷過(或者阻塞過)(比如手動interrupt()或者超時等等,那么就再中斷一次,中斷兩次的意思就是清除中斷位)。
大體上整個Lock.lock()就這樣一個流程。除了lock()方法外,還有lockInterruptibly()/tryLock()/unlock()/newCondition()等,在接下來的章節中會一一介紹。