一直想好好學(xué)習(xí)concurrent包中的各個類的實現(xiàn),然而經(jīng)常看了一點就因為其他事情干擾而放下了。發(fā)現(xiàn)這樣太不利于自己的成長了,因而最近打算潛心一件一件的完成自己想學(xué)習(xí)的東西。
對concurrent包的學(xué)習(xí)打算先從Lock的實現(xiàn)開始,因而自然而然的就端起了AbstractQueuedSynchronizer,然而要讀懂這個類的源碼并不是那么容易,因而我就開始問自己一個問題:如果自己要去實現(xiàn)這個一個Lock對象,應(yīng)該如何實現(xiàn)呢?
要實現(xiàn)Lock對象,首先理解什么是鎖?我自己從編程角度簡單的理解,所謂鎖對象(互斥鎖)就是它能保證一次只有一個線程能進入它保護的臨界區(qū),如果有一個線程已經(jīng)拿到鎖對象,那么其他對象必須讓權(quán)等待,而在該線程退出這個臨界區(qū)時需要喚醒等待列表中的其他線程。更學(xué)術(shù)一些,《計算機操作系統(tǒng)》中對同步機制準(zhǔn)則的歸納(P50):
- 空閑讓進。當(dāng)無進程處于臨界區(qū)時,表明臨界資源處于空閑狀態(tài),應(yīng)允許一個請求進入臨界區(qū)的進程立即進入自己的臨界區(qū),以有效的利用臨界資源。
- 忙則等待。當(dāng)已有進程進入臨界區(qū)時,表明臨界資源正在被訪問,因而其他試圖進入臨界區(qū)的進程必須等待,以保證對臨界區(qū)資源的互斥訪問。
- 有限等待。對要求訪問臨界資源的進程,應(yīng)保證在有限時間內(nèi)能進入自己的臨界區(qū),以免陷入“死等”狀態(tài)。
- 讓權(quán)等待。當(dāng)進程不能進入自己的臨界區(qū)時,應(yīng)該釋放處理機,以免進程陷入“忙等”狀態(tài)。
說了那么多,其實對互斥鎖很簡單,只需要一個標(biāo)記位,如果該標(biāo)記位為0,表示沒有被占用,因而直接獲得鎖,然后把該標(biāo)記位置為1,此時其他線程發(fā)現(xiàn)該標(biāo)記位已經(jīng)是1,因而需要等待。這里對這個標(biāo)記位的比較并設(shè)值必須是原子操作,而在JDK5以后提供的atomic包里的工具類可以很方便的提供這個原子操作。然而上面的四個準(zhǔn)則應(yīng)該漏了一點,即釋放鎖的線程(進程)和得到鎖的線程(進程)應(yīng)該是同一個,就像一把鑰匙對應(yīng)一把鎖(理想的),所以一個非常簡單的Lock類可以這么實現(xiàn):
private final AtomicInteger state = new AtomicInteger(0);
private volatile Thread owner; // 這里owner字段可能存在中間值,不可靠,因而其他線程不可以依賴這個字段的值
public void lock() {
while (!state.compareAndSet(0, 1)) { }
owner = Thread.currentThread();
}
public void unlock() {
Thread currentThread = Thread.currentThread();
if (owner != currentThread || !state.compareAndSet(1, 0)) {
throw new IllegalStateException("The lock is not owned by thread: " + currentThread);
}
owner = null;
}
}
一個簡單的測試方法:
public void testLockCorrectly() throws InterruptedException {
final int COUNT = 100;
Thread[] threads = new Thread[COUNT];
SpinLockV1 lock = new SpinLockV1();
AddRunner runner = new AddRunner(lock);
for (int i = 0; i < COUNT; i++) {
threads[i] = new Thread(runner, "thread-" + i);
threads[i].start();
}
for (int i = 0; i < COUNT; i++) {
threads[i].join();
}
assertEquals(COUNT, runner.getState());
}
private static class AddRunner implements Runnable {
private final SpinLockV1 lock;
private int state = 0;
public AddRunner(SpinLockV1 lock) {
this.lock = lock;
}
public void run() {
lock.lock();
try {
quietSleep(10);
state++;
System.out.println(Thread.currentThread().getName() + ": " + state);
} finally {
lock.unlock();
}
}
public int getState() {
return state;
}
}
然而這個SpinLock其實并不需要state這個字段,因為owner的賦值與否也是一種狀態(tài),因而可以用它作為一種互斥狀態(tài):
private final AtomicReference<Thread> owner = new AtomicReference<Thread>(null);
public void lock() {
final Thread currentThread = Thread.currentThread();
while (!owner.compareAndSet(null, currentThread)) { }
}
public void unlock() {
Thread currentThread = Thread.currentThread();
if (!owner.compareAndSet(currentThread, null)) {
throw new IllegalStateException("The lock is not owned by thread: " + currentThread);
}
}
}
這在操作系統(tǒng)中被定義為整形信號量,然而整形信號量如果沒拿到鎖會一直處于“忙等”狀態(tài)(沒有遵循有限等待和讓權(quán)等待的準(zhǔn)則),因而這種鎖也叫Spin Lock,在短暫的等待中它可以提升性能,因為可以減少線程的切換,concurrent包中的Atomic大部分都采用這種機制實現(xiàn),然而如果需要長時間的等待,“忙等”會占用不必要的CPU時間,從而性能會變的很差,這個時候就需要將沒有拿到鎖的線程放到等待列表中,這種方式在操作系統(tǒng)中也叫記錄型信號量,它遵循了讓權(quán)等待準(zhǔn)則(當(dāng)前沒有實現(xiàn)有限等待準(zhǔn)則)。在JDK6以后提供了LockSupport.park()/LockSupport.unpark()操作,可以將當(dāng)前線程放入一個等待列表或?qū)⒁粋€線程從這個等待列表中喚醒。然而這個park/unpark的等待列表是一個全局的等待列表,在unpartk的時候還是需要提供需要喚醒的Thread對象,因而我們需要維護自己的等待列表,但是如果我們可以用JDK提供的工具類ConcurrentLinkedQueue,就非常容易實現(xiàn),如LockSupport文檔中給出來的代碼事例:
private final AtomicBoolean locked = new AtomicBoolean(false);
private final Queue<Thread> waiters = new ConcurrentLinkedQueue<Thread>();
public void lock() {
boolean wasInterrupted = false;
Thread current = Thread.currentThread();
waiters.add(current);
// Block while not first in queue or cannot acquire lock
while (waiters.peek() != current || !locked.compareAndSet(false, true)) {
LockSupport.park(this);
if (Thread.interrupted()) // ignore interrupts while waiting
wasInterrupted = true;
}
waiters.remove();
if (wasInterrupted) // reassert interrupt status on exit
current.interrupt();
}
public void unlock() {
locked.set(false);
LockSupport.unpark(waiters.peek());
}
}
在該代碼事例中,有一個線程等待隊列和鎖標(biāo)記字段,每次調(diào)用lock時先將當(dāng)前線程放入這個等待隊列中,然后拿出隊列頭線程對象,如果該線程對象正好是當(dāng)前線程,并且成功
使用CAS方式設(shè)置locked字段(這里需要兩個同時滿足,因為可能出現(xiàn)一個線程已經(jīng)從隊列中移除了但還沒有unlock,此時另一個線程調(diào)用lock方法,此時隊列頭的線程就是第二個線程,然而由于第一個線程還沒有unlock或者正在unlock,因而需要使用CAS原子操作來判斷是否要park),表示該線程競爭成功,獲得鎖,否則將當(dāng)前線程park,這里之所以要放在
while循環(huán)中,因為park操作可能無理由返回(spuriously),如文檔中給出的描述:
然而本文的目的是自己實現(xiàn)一個Lock對象,即只使用一些基本的操作,而不使用JDK提供的Atomic類和ConcurrentLinkedQueue。類似的首先我們也需要一個隊列存放等待線程隊列(公平起見,使用先進先出隊列),因而先定義一個Node對象用以構(gòu)成這個隊列:
volatile Thread owner;
volatile Node prev;
volatile Node next;
public Node(Thread owner) {
this.owner = owner;
this.state = INIT;
}
public Node() {
this(Thread.currentThread());
}
}
簡單起見,隊列頭是一個起點的placeholder,每個調(diào)用lock的線程都先將自己競爭放入這個隊列尾,每個隊列頭后一個線程(Node)即是獲得鎖的線程,所以我們需要有head Node字段用以快速獲取隊列頭的后一個Node,而tail Node字段用來快速插入新的Node,所以關(guān)鍵在于如何線程安全的構(gòu)建這個隊列,方法還是一樣的,使用CAS操作,即CAS方法將自己設(shè)置成tail值,然后重新構(gòu)建這個列表:
while (true) {
final Node preTail = tail;
node.prev = preTail;
if (compareAndSetTail(preTail, node)) {
preTail.next = node;
return node.prev == head;
}
}
}
在當(dāng)前線程Node以線程安全的方式放入這個隊列后,lock實現(xiàn)相對就比較簡單了,如果當(dāng)前Node是的前驅(qū)是head,該線程獲得鎖,否則park當(dāng)前線程,處理park無理由返回的問題,因而將park放入while循環(huán)中(該實現(xiàn)是一個不可重入的實現(xiàn)):
// Put the latest node to a queue first, then check if the it is the first node
// this way, the list is the only shared resource to deal with
Node node = new Node();
if (enqueue(node)) {
current = node.owner;
} else {
while (node.prev != head) {
LockSupport.park(this); // This may return "spuriously"!!, so put it to while
}
current = node.owner;
}
}
unlock的實現(xiàn)需要考慮多種情況,如果當(dāng)前Node(head.next)有后驅(qū),那么直接unpark該后驅(qū)即可;如果沒有,表示當(dāng)前已經(jīng)沒有其他線程在等待隊列中,然而在這個判斷過程中可能會有其他線程進入,因而需要用CAS的方式設(shè)置tail,如果設(shè)置失敗,表示此時有其他線程進入,因而需要將該新進入的線程unpark從而該新進入的線程在調(diào)用park后可以立即返回(這里的CAS和enqueue的CAS都是對tail操作,因而能保證狀態(tài)一致):
Node curNode = unlockValidate();
Node next = curNode.next;
if (next != null) {
head.next = next;
next.prev = head;
LockSupport.unpark(next.owner);
} else {
if (!compareAndSetTail(curNode, head)) {
while (curNode.next == null) { } // Wait until the next available
// Another node queued during the time, so we have to unlock that, or else, this node can never unparked
unlock();
} else {
compareAndSetNext(head, curNode, null); // Still use CAS here as the head.next may already been changed
}
}
}
具體的代碼和測試類可以參考查看這里。
其實直到自己寫完這個類后才直到者其實這是一個MCS鎖的變種,因而這個實現(xiàn)每個線程park在自身對應(yīng)的node上,而由前一個線程unpark它;而AbstractQueuedSynchronizer是CLH鎖,因為它的park由前驅(qū)狀態(tài)決定,雖然它也是由前一個線程unpark它。具體可以參考這里。