一、前言
ReentrantLock是JDK1.5引入的,它擁有與synchronized相同的并發(fā)性和內(nèi)存語(yǔ)義,并提供了超出synchonized的其他高級(jí)功能(例如,中斷鎖等候、條件變量等),并且使用ReentrantLock比synchronized能獲得更好的可伸縮性。
ReentrantLock的實(shí)現(xiàn)基于AQS(AbstractQueuedSynchronizer)和LockSupport。
AQS主要利用硬件原語(yǔ)指令(CAS compare-and-swap),來實(shí)現(xiàn)輕量級(jí)多線程同步機(jī)制,并且不會(huì)引起CPU上文切換和調(diào)度,同時(shí)提供內(nèi)存可見性和原子化更新保證(線程安全的三要素:原子性、可見性、順序性)。
AQS的本質(zhì)上是一個(gè)同步器/阻塞鎖的基礎(chǔ)框架,其作用主要是提供加鎖、釋放鎖,并在內(nèi)部維護(hù)一個(gè)FIFO等待隊(duì)列,用于存儲(chǔ)由于鎖競(jìng)爭(zhēng)而阻塞的線程。
二、關(guān)鍵代碼分析
1.關(guān)鍵字段
AQS使用鏈表作為隊(duì)列,使用volatile變量state,作為鎖狀態(tài)標(biāo)識(shí)位。

/** *//**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head; //等待隊(duì)列的頭


/** *//**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail; //等待隊(duì)列的尾


/** *//**
* The synchronization state.
*/
private volatile int state; //原子性的鎖狀態(tài)位,ReentrantLock對(duì)該字段的調(diào)用是通過原子操作compareAndSetState進(jìn)行的



protected final boolean compareAndSetState(int expect, int update)
{
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}


2.ReentrantLock的公平鎖與非公平鎖
從ReentrantLock的構(gòu)造子可以看到,ReentrantLock提供兩種鎖:公平鎖和非公平鎖,其內(nèi)部實(shí)現(xiàn)了兩種同步器NonfairSync、FairSync派生自AQS,主要才采用了模板方法模式,主要重寫了AQS的tryAcquire、lock方法,如下圖。


public ReentrantLock()
{
sync = new NonfairSync();
}


public ReentrantLock(boolean fair)
{
sync = (fair)? new FairSync() : new NonfairSync();
}


3.獲取鎖操作
public void lock() {
sync.lock();
}
由于NonfairSync、FairSync分別實(shí)現(xiàn)了lock方法,我們將分別探討
3.1NonfairSync.lock()分析
(1)通過原子的比較并設(shè)置操作,如果成功設(shè)置,說明鎖是空閑的,當(dāng)前線程獲得鎖,并把當(dāng)前線程設(shè)置為鎖擁有者;
(2)否則,調(diào)用acquire方法;
package java.util.concurrent.locks.ReentrantLock;
final void lock()
{
if (compareAndSetState(0, 1))//表示如果當(dāng)前state=0,那么設(shè)置state=1,并返回true;否則返回false。由于未等待,所以線程不需加入到等待隊(duì)列
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
package java.util.concurrent.locks.AbstractOwnableSynchronizer //AbstractOwnableSynchronizer是AQS的父類

protected final void setExclusiveOwnerThread(Thread t)
{
exclusiveOwnerThread = t;
}

3.1.1acquire方法分析
(1)如果嘗試以獨(dú)占的方式獲得鎖失敗,那么就把當(dāng)前線程封裝為一個(gè)Node,加入到等待隊(duì)列中;如果加入隊(duì)列成功,接下來檢查當(dāng)前線程的節(jié)點(diǎn)是否應(yīng)該等待(掛起),如果當(dāng)前線程所處節(jié)點(diǎn)的前一節(jié)點(diǎn)的等待狀態(tài)小于0,則通過LockSupport掛起當(dāng)前線程;無論線程是否被掛起,或者掛起后被激活,都應(yīng)該返回當(dāng)前線程的中斷狀態(tài),如果處于中斷狀態(tài),需要中斷當(dāng)前線程。
package java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg)
{
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}


protected final boolean tryAcquire(int acquires)
{
return nonfairTryAcquire(acquires);
}


3.1.2nonfairTryAcquire分析
(1)如果鎖狀態(tài)空閑(state=0),且通過原子的比較并設(shè)置操作,那么當(dāng)前線程獲得鎖,并把當(dāng)前線程設(shè)置為鎖擁有者;
(2)如果鎖狀態(tài)空閑,且原子的比較并設(shè)置操作失敗,那么返回false,說明嘗試獲得鎖失敗;
(3)否則,檢查當(dāng)前線程與鎖擁有者線程是否相等(表示一個(gè)線程已經(jīng)獲得該鎖,再次要求該鎖,這種情況叫可重入鎖),如果相等,維護(hù)鎖狀態(tài),并返回true;
(4)如果不是以上情況,說明鎖已經(jīng)被其他的線程持有,直接返回false;

final boolean nonfairTryAcquire(int acquires)
{
final Thread current = Thread.currentThread();
int c = getState();

if (c == 0)
{

if (compareAndSetState(0, acquires))
{
setExclusiveOwnerThread(current);
return true;
}
}

else if (current == getExclusiveOwnerThread())
{ //表示一個(gè)線程已經(jīng)獲得該鎖,再次要求該鎖(重入鎖的由來),為狀態(tài)位加acquires
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

3.1.3addWaiter分析
(1)如果tail節(jié)點(diǎn)不為null,說明隊(duì)列不為空,則把新節(jié)點(diǎn)加入到tail的后面,返回當(dāng)前節(jié)點(diǎn),否則進(jìn)入enq進(jìn)行處理(2);
(2)如果tail節(jié)點(diǎn)為null,說明隊(duì)列為空,需要建立一個(gè)虛擬的頭節(jié)點(diǎn),并把封裝了當(dāng)前線程的節(jié)點(diǎn)設(shè)置為尾節(jié)點(diǎn);另外一種情況的發(fā)生,是由于在(1)中的compareAndSetTail可能會(huì)出現(xiàn)失敗,這里采用for的無限循環(huán),是要保證當(dāng)前線程能夠正確進(jìn)入等待隊(duì)列;
package java.util.concurrent.locks.AbstractQueuedSynchronizer

private Node addWaiter(Node mode)
{
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;

if (pred != null)
{ //如果當(dāng)前隊(duì)列不是空隊(duì)列,則把新節(jié)點(diǎn)加入到tail的后面,返回當(dāng)前節(jié)點(diǎn),否則進(jìn)入enq進(jìn)行處理。
node.prev = pred;

if (compareAndSetTail(pred, node))
{
pred.next = node;
return node;
}
}
enq(node);
return node;
}

package java.util.concurrent.locks.AbstractQueuedSynchronizer

private Node enq(final Node node)
{

for (;;)
{
Node t = tail;

if (t == null)
{ // tail節(jié)點(diǎn)為空,說明是空隊(duì)列,初始化頭節(jié)點(diǎn),如果成功,返回頭節(jié)點(diǎn)
Node h = new Node(); // Dummy header
h.next = node;
node.prev = h;

if (compareAndSetHead(h))
{
tail = node;
return h;
}
}

else
{ //
node.prev = t;

if (compareAndSetTail(t, node))
{
t.next = node;
return t;
}
}
}


3.1.4acquire分析
(1)如果當(dāng)前節(jié)點(diǎn)是隊(duì)列的頭結(jié)點(diǎn)(如果第一個(gè)節(jié)點(diǎn)是虛擬節(jié)點(diǎn),那么第二個(gè)節(jié)點(diǎn)實(shí)際上就是頭結(jié)點(diǎn)了),就嘗試在此獲取鎖tryAcquire(arg)。如果成功就將頭結(jié)點(diǎn)設(shè)置為當(dāng)前節(jié)點(diǎn)(不管第一個(gè)結(jié)點(diǎn)是否是虛擬節(jié)點(diǎn)),返回中斷狀態(tài)。否則進(jìn)行(2)。
(2)檢測(cè)當(dāng)前節(jié)點(diǎn)是否應(yīng)該park()-"掛起的意思",如果應(yīng)該park()就掛起當(dāng)前線程并且返回當(dāng)前線程中斷狀態(tài)。進(jìn)行操作(1)。

final boolean acquireQueued(final Node node, int arg)
{

try
{
boolean interrupted = false;

for (;;)
{
final Node p = node.predecessor();

if (p == head && tryAcquire(arg))
{
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}

} catch (RuntimeException ex)
{
cancelAcquire(node);
throw ex;
}
}

3.1.5 shouldParkAfterFailedAcquire分析
(1)如果前一個(gè)節(jié)點(diǎn)的等待狀態(tài)waitStatus<0,也就是前面的節(jié)點(diǎn)還沒有獲得到鎖,那么返回true,表示當(dāng)前節(jié)點(diǎn)(線程)就應(yīng)該park()了。否則進(jìn)行(2)。
(2)如果前一個(gè)節(jié)點(diǎn)的等待狀態(tài)waitStatus>0,也就是前一個(gè)節(jié)點(diǎn)被CANCELLED了,那么就將前一個(gè)節(jié)點(diǎn)去掉,遞歸此操作直到所有前一個(gè)節(jié)點(diǎn)的waitStatus<=0,進(jìn)行(4)。否則進(jìn)行(3)。
(3)前一個(gè)節(jié)點(diǎn)等待狀態(tài)waitStatus=0,修改前一個(gè)節(jié)點(diǎn)狀態(tài)位為SINGAL,表示后面有節(jié)點(diǎn)等待你處理,需要根據(jù)它的等待狀態(tài)來決定是否該park()。進(jìn)行(4)。
(4)返回false,表示線程不應(yīng)該park()。
注意:一個(gè)Node節(jié)點(diǎn)可包含以下狀態(tài)以及模式:
/** waitStatus value to indicate thread has cancelled */ 取消
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */ 信號(hào)等待(在AQS中,是通過LockSupport進(jìn)行線程間信號(hào)交互的)
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */ 條件等待
static final int CONDITION = -2;
/** Marker to indicate a node is waiting in shared mode */ 共享模式
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */ 獨(dú)占模式
static final Node EXCLUSIVE = null;

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)
{
int s = pred.waitStatus;
if (s < 0)

/**//*
* This node has already set status asking a release
* to signal it, so it can safely park
*/
return true;

if (s > 0)
{

/**//*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/

do
{
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
else

/**//*
* Indicate that we need a signal, but don't park yet. Caller
* will need to retry to make sure it cannot acquire before
* parking.
*/
compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
return false;
}


private final boolean parkAndCheckInterrupt()
{
LockSupport.park(this); //阻塞,即掛起;在沒有unpark之前,下面的代碼將不會(huì)執(zhí)行;
return Thread.interrupted();//個(gè)人感覺,如果沒有外部的interrupt或者超時(shí)等,這里將始終返回false;
}


private static void selfInterrupt()
{
Thread.currentThread().interrupt();
}


3.2FairSync.lock()分析
公平鎖相對(duì)與非公平鎖,在鎖的獲取實(shí)現(xiàn)上,差別只在FairSync提供自己的tryAcquire()的方法實(shí)現(xiàn),代碼如下:
(1)如果鎖狀態(tài)為0,等待隊(duì)列為空,或者給定的線程在隊(duì)列的頭部,那么該線程獲得鎖;
(2)如果當(dāng)前線程與鎖持有者線程相等,這種情況屬于鎖重入,鎖狀態(tài)加上請(qǐng)求數(shù);
(3)以上兩種情況都不是,返回false,說明嘗試獲得鎖失敗;

protected final boolean tryAcquire(int acquires)
{
final Thread current = Thread.currentThread();
int c = getState();

if (c == 0)
{
if (isFirst(current) &&

compareAndSetState(0, acquires))
{
setExclusiveOwnerThread(current);
return true;
}
}

else if (current == getExclusiveOwnerThread())
{
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}



final boolean isFirst(Thread current)
{
Node h, s;
return ((h = head) == null ||
((s = h.next) != null && s.thread == current) ||
fullIsFirst(current)); //頭為null,頭的下個(gè)節(jié)點(diǎn)不是空且該節(jié)點(diǎn)的線程與當(dāng)前線程是相等的,
}


final boolean fullIsFirst(Thread current)
{
// same idea as fullGetFirstQueuedThread
Node h, s;
Thread firstThread = null;//如果頭不為空,且頭的下個(gè)節(jié)點(diǎn)也不為空,且該節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn)是頭節(jié)點(diǎn),且該節(jié)點(diǎn)的線程不為null
if (((h = head) != null && (s = h.next) != null &&
s.prev == head && (firstThread = s.thread) != null))
return firstThread == current;
Node t = tail;

while (t != null && t != head)
{
Thread tt = t.thread;
if (tt != null)
firstThread = tt;
t = t.prev;
}
return firstThread == current || firstThread == null;
}
4.總結(jié)
ReentrantLock在采用非公平鎖構(gòu)造時(shí),首先檢查鎖狀態(tài),如果鎖可用,直接通過CAS設(shè)置成持有狀態(tài),且把當(dāng)前線程設(shè)置為鎖的擁有者。
如果當(dāng)前鎖已經(jīng)被持有,那么接下來進(jìn)行可重入檢查,如果可重入,需要為鎖狀態(tài)加上請(qǐng)求數(shù)。如果不屬于上面兩種情況,那么說明鎖是被其他線程持有,
當(dāng)前線程應(yīng)該放入等待隊(duì)列。
在放入等待隊(duì)列的過程中,首先要檢查隊(duì)列是否為空隊(duì)列,如果為空隊(duì)列,需要?jiǎng)?chuàng)建虛擬的頭節(jié)點(diǎn),然后把對(duì)當(dāng)前線程封裝的節(jié)點(diǎn)加入到隊(duì)列尾部。由于設(shè)置尾部節(jié)點(diǎn)采用了CAS,為了保證尾節(jié)點(diǎn)能夠設(shè)置成功,這里采用了無限循環(huán)的方式,直到設(shè)置成功為止。
在完成放入等待隊(duì)列任務(wù)后,則需要維護(hù)節(jié)點(diǎn)的狀態(tài),以及及時(shí)清除處于Cancel狀態(tài)的節(jié)點(diǎn),以幫助垃圾收集器及時(shí)回收。如果當(dāng)前節(jié)點(diǎn)之前的節(jié)點(diǎn)的等待狀態(tài)小于1,說明當(dāng)前節(jié)點(diǎn)之前的線程處于等待狀態(tài)(掛起),那么當(dāng)前節(jié)點(diǎn)的線程也應(yīng)處于等待狀態(tài)(掛起)。掛起的工作是由LockSupport類支持的,LockSupport通過JNI調(diào)用本地操作系統(tǒng)來完成掛起的任務(wù)(java中除了廢棄的suspend等方法,沒有其他的掛起操作)。
在當(dāng)前等待的線程,被喚起后,檢查中斷狀態(tài),如果處于中斷狀態(tài),那么需要中斷當(dāng)前線程。
下一節(jié)《ReentrantLock源碼之一lock方法解析(鎖的釋放)》
posted on 2011-08-18 14:12
zhangxl 閱讀(19706)
評(píng)論(1) 編輯 收藏 所屬分類:
java 并發(fā)