java thread 之AQS
JDK1.5引入了Doug Lea大神的concurrent框架,其中AbstractQueuedSynchronizer是concurrent框架的基本,從大神的paper中可以看到
1.傳統的synchronized不能進行中段,這個不合適
2.如果將concurrent重心放在少數競爭下優化鎖,而在其他情況下放任緩慢執行的策略是不正確的
3.需要可預測的維護效率,即使在同步競爭激烈的情況下,理想中無論多少線程試圖通過一個同步點的開銷應該是恒定的
4.設計的目標是總時間的減少,因為有可能在此之間一個線程可以通過同步點,然后他沒有立即執行
5.在高吞吐量的基本上,更重要的是線程的公平調度
AQS設計思路:
原子的管理同步狀態;阻塞和解除阻塞線程;保持線程的阻塞隊列。
實現
1.在CLH queue的基礎上進行改造
2.單個int state 計數synchronization狀態
隊列的每個節點是內部類Node:
對于waitStatus>0的node在等待的遍歷當中是會被拋棄掉的,而nextWaiter在共享鎖和Lock的Condition中會用到
關鍵點1.將線程入隊列,不管是否是第一個線程。在和其他線程競爭前,嘗試入隊一次
關鍵點2.header只是一個傀儡node,他代表上次獲得的線程,在這里只有header的后面一個thread嘗試成功了,才會跳出循環,然后其他的線程只能在循環中
關鍵點3.檢查加入的node對應thread是否需要park(),當node的waitStatus>0就是取消的node,不需要park(),其他的需要park().park住的線程就一直阻塞到unpark
關鍵點4.這里setHead()方法把當前取得鎖的node設置為header,使得隊列往前走了一步。
共享鎖
從acquireShared(int arg)開始:
共享的和獨占的在實現上面是類似得,共享實現上,對于獲得能成功,只要是子類實現上面能獲得成功,如信號量的實現(state的可用量是大于1的),就不用進入隊列阻塞。
2.如果將concurrent重心放在少數競爭下優化鎖,而在其他情況下放任緩慢執行的策略是不正確的
3.需要可預測的維護效率,即使在同步競爭激烈的情況下,理想中無論多少線程試圖通過一個同步點的開銷應該是恒定的
4.設計的目標是總時間的減少,因為有可能在此之間一個線程可以通過同步點,然后他沒有立即執行
5.在高吞吐量的基本上,更重要的是線程的公平調度
AQS設計思路:
原子的管理同步狀態;阻塞和解除阻塞線程;保持線程的阻塞隊列。
實現
1.在CLH queue的基礎上進行改造
2.單個int state 計數synchronization狀態
隊列的每個節點是內部類Node:
static final class Node {
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
}
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
}
對于waitStatus>0的node在等待的遍歷當中是會被拋棄掉的,而nextWaiter在共享鎖和Lock的Condition中會用到
void acquire(int arg)方法
該方法是其他基于AQS實現的具體類獲得鎖或者信號等的基礎實現
總結下:該方法是其他基于AQS實現的具體類獲得鎖或者信號等的基礎實現
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire(int arg) 留給具體子類實現,如果返回false,說明沒有得到鎖,則調用acquireQueued(final Node node, int arg)方法,在調用此方法前需要將當前線程的node加入隊列:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
Node h = new Node(); // Dummy header
h.next = node;
node.prev = h;
if (compareAndSetHead(h)) {
tail = node;
return h;
}
}
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}這是個循環,只到node加入到隊列中才返回,當是對隊列的第一次插入node時,必須初始化隊列,創建一個傀儡header,完成入隊操作后執行acquireQueued()方法:
,而且返回node對應的thread的中斷狀態,如果前面流程沒有執行,執行shouldParkAfterFailedAcquire():
int s = pred.waitStatus;
if (s < 0)
/*
* This node has already set status asking a release
* to signal it, so it can safely park
*/
return true;
if (s > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
else
/*
* Indicate that we need a signal, but don't park yet. Caller
* will need to retry to make sure it cannot acquire before
* parking.
*/
compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
return false;
}
得到node的前驅prev的waitStatus,當waitStatus>0,那么prev處于退出狀態,從隊列中去除prev,如果waitStatus<0,說明線程需要park住,如果沒有為0,就是新建的,需要設置成
-1,在下一次循環中,如果還不能獲得鎖那么就需要park住線程,parkAndCheckInterrupt()就是執行park()線程,然后返回線程中斷狀態.
private Node addWaiter(Node mode) {
//關鍵點1.
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
首先嘗試快速加入到隊尾,如果加入隊尾失敗,說明在此期間,其余的線程入隊了,那么真正的執行入隊操作://關鍵點1.
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
Node h = new Node(); // Dummy header
h.next = node;
node.prev = h;
if (compareAndSetHead(h)) {
tail = node;
return h;
}
}
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//關鍵點2.
if (p == head && tryAcquire(arg)) {
//關鍵點4.
setHead(node);
p.next = null; // help GC
return interrupted;
}
//關鍵點3
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
}
前面的方法可以看出當前thread的對應的node就是參數傳入的node,獲得node的前驅,如果當前前驅是傀儡header而且子類的tryAcquire()方法返回為真,那么當前的node變成headertry {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//關鍵點2.
if (p == head && tryAcquire(arg)) {
//關鍵點4.
setHead(node);
p.next = null; // help GC
return interrupted;
}
//關鍵點3
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
}
,而且返回node對應的thread的中斷狀態,如果前面流程沒有執行,執行shouldParkAfterFailedAcquire():
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int s = pred.waitStatus;
if (s < 0)
/*
* This node has already set status asking a release
* to signal it, so it can safely park
*/
return true;
if (s > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
else
/*
* Indicate that we need a signal, but don't park yet. Caller
* will need to retry to make sure it cannot acquire before
* parking.
*/
compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
return false;
}
-1,在下一次循環中,如果還不能獲得鎖那么就需要park住線程,parkAndCheckInterrupt()就是執行park()線程,然后返回線程中斷狀態.
acquireQueued()方法是個死循環,只到header之后的一個線程獲得鎖之后,才會重新設置header,被設置為新header的是header的next node,就這樣返回到acquire()中,如果返回的中斷狀態為true,當前線程中斷.
關鍵點1.將線程入隊列,不管是否是第一個線程。在和其他線程競爭前,嘗試入隊一次
關鍵點2.header只是一個傀儡node,他代表上次獲得的線程,在這里只有header的后面一個thread嘗試成功了,才會跳出循環,然后其他的線程只能在循環中
關鍵點3.檢查加入的node對應thread是否需要park(),當node的waitStatus>0就是取消的node,不需要park(),其他的需要park().park住的線程就一直阻塞到unpark
關鍵點4.這里setHead()方法把當前取得鎖的node設置為header,使得隊列往前走了一步。
boolean release(int arg) 方法
這個方法是unlock()等方法的基礎方法 public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease()留給子類實現,在隊列header處的node不為空而且狀態不為初始狀態的話(初始為0),需要為這個header所持有的thread unpark
private void unparkSuccessor(Node node) {
/*
* Try to clear status in anticipation of signalling. It is
* OK if this fails or if status is changed by waiting thread.
*/
compareAndSetWaitStatus(node, Node.SIGNAL, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor. 這里貌似是cancelAcquire()方法引起從后向前遍歷
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
在acquire()方法中知道header是個傀儡header,所以這個方法中得到header的next,如果next不為空或者next為取消的node,則從tail開始向前遍歷,找到最前面waitStatus<=0的node,然后unpark node的thread。/*
* Try to clear status in anticipation of signalling. It is
* OK if this fails or if status is changed by waiting thread.
*/
compareAndSetWaitStatus(node, Node.SIGNAL, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor. 這里貌似是cancelAcquire()方法引起從后向前遍歷
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
void cancelAcquire(Node node)
該方法是一切取消獲得的基礎實現,代碼有些地方需要注意: private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// Getting this before setting waitStatus ensures staleness
//前面的操作使得前置改變了 但是prex的后置還是沒有變
Node predNext = pred.next;
// Can use unconditional write instead of CAS here
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves
if (node == tail && compareAndSetTail(node, pred)) {
//tail就set null 了
compareAndSetNext(pred, predNext, null);
} else {
// If "active" predecessor found
//
if (pred != head
&& (pred.waitStatus == Node.SIGNAL
|| compareAndSetWaitStatus(pred, 0, Node.SIGNAL))
&& pred.thread != null) {
// If successor is active, set predecessor's next link
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//如果前面沒有阻塞的node,到本node下面的話,需要unpark了
unparkSuccessor(node);
}
node.next = node; // help GC 這個會影響next節點的秩序 還好每次pred.waitStatus>0的檢測使得受影響的時間窗口比較小
}
}
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// Getting this before setting waitStatus ensures staleness
//前面的操作使得前置改變了 但是prex的后置還是沒有變
Node predNext = pred.next;
// Can use unconditional write instead of CAS here
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves
if (node == tail && compareAndSetTail(node, pred)) {
//tail就set null 了
compareAndSetNext(pred, predNext, null);
} else {
// If "active" predecessor found

//
if (pred != head
&& (pred.waitStatus == Node.SIGNAL
|| compareAndSetWaitStatus(pred, 0, Node.SIGNAL))
&& pred.thread != null) {
// If successor is active, set predecessor's next link
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//如果前面沒有阻塞的node,到本node下面的話,需要unpark了
unparkSuccessor(node);
}
node.next = node; // help GC 這個會影響next節點的秩序 還好每次pred.waitStatus>0的檢測使得受影響的時間窗口比較小
}
}
共享鎖
從acquireShared(int arg)開始:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
}
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
}
private void setHeadAndPropagate(Node node, int propagate) {
//隊列向后移動一位
setHead(node);
//propagate>0說明,共享數值是大于前面要求的數值的
if (propagate > 0 && node.waitStatus != 0) {
/*
* Don't bother fully figuring out successor. If it
* looks null, call unparkSuccessor anyway to be safe.
*/
Node s = node.next;
//如果只剩下一個node節點 那么直接unpark是可以的,因為就這個node,propagate數也大于0
//如果是共享的也可以unpark,unpark后還在doAcquireShared循環的,如果發現acquires數值過大,這個線程還是會park住的
if (s == null || s.isShared())
unparkSuccessor(node);
}
}
//隊列向后移動一位
setHead(node);
//propagate>0說明,共享數值是大于前面要求的數值的
if (propagate > 0 && node.waitStatus != 0) {
/*
* Don't bother fully figuring out successor. If it
* looks null, call unparkSuccessor anyway to be safe.
*/
Node s = node.next;
//如果只剩下一個node節點 那么直接unpark是可以的,因為就這個node,propagate數也大于0
//如果是共享的也可以unpark,unpark后還在doAcquireShared循環的,如果發現acquires數值過大,這個線程還是會park住的
if (s == null || s.isShared())
unparkSuccessor(node);
}
}
共享的和獨占的在實現上面是類似得,共享實現上,對于獲得能成功,只要是子類實現上面能獲得成功,如信號量的實現(state的可用量是大于1的),就不用進入隊列阻塞。
Condition
condition是服務于單個Lock的,condition.await()等等待方法在Lock上面形成了一個condition的等待隊列,condition.singal()方法在Lock上面處理condition的等待隊列,
一個一個化解,然后將隊列的node加入到AQS的阻塞隊列中等待對應的線程被unpark
關鍵點1.加入到condition的對應的lock私有的隊列中,和AQS的阻塞隊列形式相似
關鍵點2.釋放這個condition對應的lock的鎖,因為在使用的過程當中,考慮到如果這個wait()方法阻塞住,而lock如果沒有釋放鎖,那么對于其他的線程的node來說肯定是阻塞住的,
因為condition對應的lock獲得了鎖,肯定在AQS的header處,其他線程肯定是得不到鎖阻塞在那里,這樣兩邊都阻塞的話就死鎖了,所以這里需要釋放對應lock的鎖的
關鍵點3.判斷condition是否已經轉化成為AQS阻塞隊列的一個節點,如果沒有park線程阻塞在這里
關鍵點4.到這一步的話就需要signal()或者signalAll()的方法的執行,說明這個線程已經被unpark,然后運行直到acquireQueued嘗試再次獲得鎖,因為condition對應的lock的鎖在關鍵
點2是被釋放了的
condition是服務于單個Lock的,condition.await()等等待方法在Lock上面形成了一個condition的等待隊列,condition.singal()方法在Lock上面處理condition的等待隊列,
一個一個化解,然后將隊列的node加入到AQS的阻塞隊列中等待對應的線程被unpark
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();//關鍵點1
int savedState = fullyRelease(node);//關鍵點2
int interruptMode = 0;
while (!isOnSyncQueue(node)) {//關鍵點3
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//關鍵點4
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
其中的關鍵點:if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();//關鍵點1
int savedState = fullyRelease(node);//關鍵點2
int interruptMode = 0;
while (!isOnSyncQueue(node)) {//關鍵點3
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//關鍵點4
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
關鍵點1.加入到condition的對應的lock私有的隊列中,和AQS的阻塞隊列形式相似
關鍵點2.釋放這個condition對應的lock的鎖,因為在使用的過程當中,考慮到如果這個wait()方法阻塞住,而lock如果沒有釋放鎖,那么對于其他的線程的node來說肯定是阻塞住的,
因為condition對應的lock獲得了鎖,肯定在AQS的header處,其他線程肯定是得不到鎖阻塞在那里,這樣兩邊都阻塞的話就死鎖了,所以這里需要釋放對應lock的鎖的
關鍵點3.判斷condition是否已經轉化成為AQS阻塞隊列的一個節點,如果沒有park線程阻塞在這里
關鍵點4.到這一步的話就需要signal()或者signalAll()的方法的執行,說明這個線程已經被unpark,然后運行直到acquireQueued嘗試再次獲得鎖,因為condition對應的lock的鎖在關鍵
點2是被釋放了的
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);//進入到AQS的阻塞隊列中
int c = p.waitStatus;
if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);//進入到AQS的阻塞隊列中
int c = p.waitStatus;
if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
在整個AQS存在兩種鏈表。 一個鏈表就是整個Sync Node鏈表,橫向鏈表。另一種鏈表就是Condition的wait Node鏈表,相對于Sync node,它屬于node節點的一個縱向鏈表。當縱向列表被single通知后,會進入對應的Sync Node進行排隊處理。
最后面上個圖理解下condition:

圖片轉載自:
最后面上個圖理解下condition:

圖片轉載自:
http://www.goldendoc.org/2011/06/juc_condition/
posted on 2011-09-15 22:57 nod0620 閱讀(667) 評論(0) 編輯 收藏 所屬分類: 多線程 、java