如果說(shuō)CountDownLatch是一次性的,那么CyclicBarrier正好可以循環(huán)使用。它允許一組線程互相等待,直到到達(dá)某個(gè)公共屏障點(diǎn) (common barrier point)。所謂屏障點(diǎn)就是一組任務(wù)執(zhí)行完畢的時(shí)刻。
清單1 一個(gè)使用CyclicBarrier的例子
package xylz.study.concurrency.lock;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;public class CyclicBarrierDemo {
final CyclicBarrier barrier;
final int MAX_TASK;
public CyclicBarrierDemo(int cnt) {
barrier = new CyclicBarrier(cnt + 1);
MAX_TASK = cnt;
}public void doWork(final Runnable work) {
new Thread() {public void run() {
work.run();
try {
int index = barrier.await();
doWithIndex(index);
} catch (InterruptedException e) {
return;
} catch (BrokenBarrierException e) {
return;
}
}
}.start();
}private void doWithIndex(int index) {
if (index == MAX_TASK / 3) {
System.out.println("Left 30%.");
} else if (index == MAX_TASK / 2) {
System.out.println("Left 50%");
} else if (index == 0) {
System.out.println("run over");
}
}public void waitForNext() {
try {
doWithIndex(barrier.await());
} catch (InterruptedException e) {
return;
} catch (BrokenBarrierException e) {
return;
}
}public static void main(String[] args) {
final int count = 10;
CyclicBarrierDemo demo = new CyclicBarrierDemo(count);
for (int i = 0; i < 100; i++) {
demo.doWork(new Runnable() {public void run() {
//do something
try {
Thread.sleep(1000L);
} catch (Exception e) {
return;
}
}
});
if ((i + 1) % count == 0) {
demo.waitForNext();
}
}
}}
清單1描述的是一個(gè)周期性處理任務(wù)的例子,在這個(gè)例子中有一對(duì)的任務(wù)(100個(gè)),希望每10個(gè)為一組進(jìn)行處理,當(dāng)前僅當(dāng)上一組任務(wù)處理完成后才能進(jìn)行下一組,另外在每一組任務(wù)中,當(dāng)任務(wù)剩下50%,30%以及所有任務(wù)執(zhí)行完成時(shí)向觀察者發(fā)出通知。
在這個(gè)例子中,CyclicBarrierDemo 構(gòu)建了一個(gè)count+1的任務(wù)組(其中一個(gè)任務(wù)時(shí)為了外界方便掛起主線程)。每一個(gè)子任務(wù)里,人物本身執(zhí)行完畢后都需要等待同組內(nèi)其它任務(wù)執(zhí)行完成后才能繼續(xù)。同時(shí)在剩下任務(wù)50%、30%已經(jīng)0時(shí)執(zhí)行特殊的其他任務(wù)(發(fā)通知)。
很顯然CyclicBarrier有以下幾個(gè)特點(diǎn):
-
await()方法將掛起線程,直到同組的其它線程執(zhí)行完畢才能繼續(xù)
-
await()方法返回線程執(zhí)行完畢的索引,注意,索引時(shí)從任務(wù)數(shù)-1開(kāi)始的,也就是第一個(gè)執(zhí)行完成的任務(wù)索引為parties-1,最后一個(gè)為0,這個(gè)parties為總?cè)蝿?wù)數(shù),清單中是cnt+1
-
CyclicBarrier 是可循環(huán)的,顯然名稱說(shuō)明了這點(diǎn)。在清單1中,每一組任務(wù)執(zhí)行完畢就能夠執(zhí)行下一組任務(wù)。
另外除了CyclicBarrier除了以上特點(diǎn)外,還有以下幾個(gè)特點(diǎn):
-
如果屏障操作不依賴于掛起的線程,那么任何線程都可以執(zhí)行屏障操作。在清單1中可以看到并沒(méi)有指定那個(gè)線程執(zhí)行50%、30%、0%的操作,而是一組線程(cnt+1)個(gè)中任何一個(gè)線程只要到達(dá)了屏障點(diǎn)都可以執(zhí)行相應(yīng)的操作
-
CyclicBarrier 的構(gòu)造函數(shù)允許攜帶一個(gè)任務(wù),這個(gè)任務(wù)將在0%屏障點(diǎn)執(zhí)行,它將在await()==0后執(zhí)行。
-
CyclicBarrier 如果在await時(shí)因?yàn)橹袛唷⑹ ⒊瑫r(shí)等原因提前離開(kāi)了屏障點(diǎn),那么任務(wù)組中的其他任務(wù)將立即被中斷,以InterruptedException異常離開(kāi)線程。
-
所有await()之前的操作都將在屏障點(diǎn)之前運(yùn)行,也就是CyclicBarrier 的內(nèi)存一致性效果
CyclicBarrier 的所有API如下:
-
public CyclicBarrier(int parties) 創(chuàng)建一個(gè)新的 CyclicBarrier,它將在給定數(shù)量的參與者(線程)處于等待狀態(tài)時(shí)啟動(dòng),但它不會(huì)在啟動(dòng) barrier 時(shí)執(zhí)行預(yù)定義的操作。
-
public CyclicBarrier(int parties, Runnable barrierAction) 創(chuàng)建一個(gè)新的 CyclicBarrier,它將在給定數(shù)量的參與者(線程)處于等待狀態(tài)時(shí)啟動(dòng),并在啟動(dòng) barrier 時(shí)執(zhí)行給定的屏障操作,該操作由最后一個(gè)進(jìn)入 barrier 的線程執(zhí)行。
-
public int await() throws InterruptedException, BrokenBarrierException 在所有參與者都已經(jīng)在此 barrier 上調(diào)用 await 方法之前,將一直等待。
-
public int await(long timeout,TimeUnit unit) throws InterruptedException, BrokenBarrierException,TimeoutException 在所有參與者都已經(jīng)在此屏障上調(diào)用 await 方法之前將一直等待,或者超出了指定的等待時(shí)間。
-
public int getNumberWaiting() 返回當(dāng)前在屏障處等待的參與者數(shù)目。此方法主要用于調(diào)試和斷言。
-
public int getParties() 返回要求啟動(dòng)此 barrier 的參與者數(shù)目。
-
public boolean isBroken() 查詢此屏障是否處于損壞狀態(tài)。
-
public void reset() 將屏障重置為其初始狀態(tài)。
針對(duì)以上API,下面來(lái)探討下CyclicBarrier 的實(shí)現(xiàn)原理,以及為什么有這樣的API。
清單2 CyclicBarrier.await*()的實(shí)現(xiàn)片段
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}if (g.broken)
throw new BrokenBarrierException();if (g != generation)
return index;if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
清單2有點(diǎn)復(fù)雜,這里一點(diǎn)一點(diǎn)的剖析,并且還原到最原始的狀態(tài)。
利用前面學(xué)到的知識(shí),我們知道要想讓線程等待其他線程執(zhí)行完畢,那么已經(jīng)執(zhí)行完畢的線程(進(jìn)入await*()方法)就需要park(),直到超時(shí)或者被中斷,或者被其它線程喚醒。
前面說(shuō)過(guò)CyclicBarrier 的特點(diǎn)是要么大家都正常執(zhí)行完畢,要么大家都異常被中斷,不會(huì)其中有一個(gè)被中斷而其它正常執(zhí)行完畢的現(xiàn)象存在。這種特點(diǎn)叫all-or-none。類似的概念是原子操作中的要么大家都執(zhí)行完,要么一個(gè)操作都不執(zhí)行完。當(dāng)前這其實(shí)是兩個(gè)概念了。要完成這樣的特點(diǎn)就必須有一個(gè)狀態(tài)來(lái)描述曾經(jīng)是否有過(guò)線程被中斷(broken)了,這樣后面執(zhí)行完的線程就該知道是否需要繼續(xù)等待了。而在CyclicBarrier 中Generation 就是為了完成這件事情的。Generation的定義非常簡(jiǎn)單,整個(gè)結(jié)構(gòu)就只有一個(gè)變量boolean broken = false;,定義是否發(fā)生了broken操作。
由于有競(jìng)爭(zhēng)資源的存在(broken/index),所以毫無(wú)疑問(wèn)需要一把鎖lock。拿到鎖后整個(gè)過(guò)程是這樣的:
-
檢查是否存在中斷位(broken),如果存在就立即以BrokenBarrierException異常返回。此異常描述的是線程進(jìn)入屏障被破壞的等待狀態(tài)。否則進(jìn)行2。
-
檢查當(dāng)前線程是否被中斷,如果是那么就設(shè)置中斷位(使其它將要進(jìn)入等待的線程知道),另外喚醒已經(jīng)等待的線程,同時(shí)以InterruptedException異常返回,表示線程要處理中斷。否則進(jìn)行3。
-
將剩余任務(wù)數(shù)減1,如果此時(shí)剩下的任務(wù)數(shù)為0,也就是達(dá)到了公共屏障點(diǎn),那么就執(zhí)行屏障點(diǎn)任務(wù)(如果有的話),同時(shí)創(chuàng)建新的Generation(在這個(gè)過(guò)程中會(huì)喚醒其它所有線程,因此當(dāng)前線程是屏障點(diǎn)線程,那么其它線程就都應(yīng)該在等待狀態(tài))。否則進(jìn)行4。
-
到這里說(shuō)明還沒(méi)有到達(dá)屏障點(diǎn),那么此時(shí)線程就應(yīng)該park()。很顯然在下面的for循環(huán)中就是要park線程。這里park線程采用的是Condition.await()方法。也就是trip.await*()。為什么需要Condition?因?yàn)樗械腶wait*()其實(shí)等待的都是一個(gè)條件,一旦條件滿足就應(yīng)該都被喚醒,所以Condition整好滿足這個(gè)特點(diǎn)。所以到這里就會(huì)明白為什么在步驟3中到達(dá)屏障點(diǎn)時(shí)創(chuàng)建新的Generation的時(shí)候是一定要喚醒其它線程的原因了。
上面4個(gè)步驟其實(shí)只是描述主體結(jié)構(gòu),事實(shí)上整個(gè)過(guò)程中有非常多的邏輯來(lái)處理異常引發(fā)的問(wèn)題,比如執(zhí)行屏障點(diǎn)任務(wù)引發(fā)的異常,park線程超時(shí)引發(fā)的中斷異常和超時(shí)異常等等。所以對(duì)于await()而言,異常的處理比業(yè)務(wù)邏輯的處理更復(fù)雜,這就解釋了為什么await()的時(shí)候可能引發(fā)InterruptedException,BrokenBarrierException,TimeoutException 三種異常。
清單3 生成下一個(gè)循環(huán)周期并喚醒其它線程
private void nextGeneration() {
trip.signalAll();
count = parties;
generation = new Generation();
}
清單3 描述了如何生成下一個(gè)循環(huán)周期的過(guò)程,在這個(gè)過(guò)程中當(dāng)然需要使用Condition.signalAll()喚醒所有已經(jīng)執(zhí)行完成并且正在等待的線程。另外這里count描述的是還有多少線程需要執(zhí)行,是為了線程執(zhí)行完畢索引計(jì)數(shù)。
isBroken() 方法描述的就是generation.broken,也即線程組是否發(fā)生了異常。這里再一次解釋下為什么要有這個(gè)狀態(tài)的存在。
如果一個(gè)將要位于屏障點(diǎn)或者已經(jīng)位于屏障點(diǎn)的而執(zhí)行屏障點(diǎn)任務(wù)的線程發(fā)生了異常,那么即使喚醒了其它等待的線程,其它等待的線程也會(huì)因?yàn)檠h(huán)等待而“死去”,因?yàn)樵僖矝](méi)有一個(gè)線程來(lái)喚醒這些第二次進(jìn)行park的線程了。還有一個(gè)意圖是,如果屏障點(diǎn)都已經(jīng)損壞了,那么其它將要等待屏障點(diǎn)的再線程掛起就沒(méi)有意義了。
其實(shí)CyclicBarrier 還有一個(gè)reset方法,描述的是手動(dòng)立即將所有線程中斷,恢復(fù)屏障點(diǎn),進(jìn)行下一組任務(wù)的執(zhí)行。也就是與重新創(chuàng)建一個(gè)新的屏障點(diǎn)相比,可能維護(hù)的代價(jià)要小一些(減少同步,減少上一個(gè)CyclicBarrier 的管理等等)。
本來(lái)是想和Semaphore 一起將的,最后發(fā)現(xiàn)鋪開(kāi)后就有點(diǎn)長(zhǎng)了,而且也不利于理解和吸收,所以放到下一篇吧。
參考資料: