1.CountDownLatch demo
package com.landon.mavs.example.concurrent;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
*
* CountdownLatch用法
*
* <pre>
* 1.同步輔助類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待
* 2.給定的計數 初始化 CountDownLatch.計數器到達零之前,所以調用await的線程會一直阻塞.之后,會釋放所有等待的線程,執行await的后續調用.
* 3.計數無法被重置.如需重置計數,可考慮{@link java.util.concurrent.CyclicBarrier}
* 4.計數 1初始化的 CountDownLatch 用作一個簡單的開/關鎖存器,或入口:
* 在通過調用 countDown() 的線程打開入口前,所有調用 await 的線程都一直在入口處等待;
* 用 N 初始化的 CountDownLatch 可以使一個線程在 N 個線程完成某項操作之前一直等待,或者使其在某項操作完成 N 次之前一直等待
* </pre>
*
* <pre>
* 1.CountDownLatch(int count) 構造一個用給定計數初始化的 CountDownLatch
* 2.await() 使當前線程在鎖存器倒計數至零之前一直等待,除非線程被中斷
* 3.await(long timeout, TimeUnit unit) 使當前線程在鎖存器倒計數至零之前一直等待,除非線程被中斷或超出了指定的等待時間
* 4.countDown() 遞減鎖存器的計數,如果計數到達零,則釋放所有等待的線程
* 5.getCount() 返回當前計數
* 6.toString() 返回標識此鎖存器及其狀態的字符串
* </pre>
*
* @author landon
*
*/
public class CountdownLatchExample {
public static void main(String[] args) throws Exception {
// 示例1:
// Master啟動多個worker線程處理任務.所有的worker線程在執行任務前需等待Master初始化,Master線程初始化完畢,則startSignal.countdown,表示開始
// 工作線程被喚醒;然后Master阻塞,等待所有的worker線程執行完畢任務;每個worker線程執行完畢任務,則countdown一下,直至執行所有的任務完成;Master被喚醒,執行收尾工作.

// 示例2:
// 將一個問題分成 N 個部分,用執行每個部分并讓鎖存器倒計數的 Runnable 來描述每個部分,然后將所有 Runnable 加入到
// Executor 隊列。
// 當所有的子部分完成后,協調線程就能夠通過 await

Master master = new Master(3);
master.start();

// 任務分為5部分,交個線程池去執行任務.
CountDownLatch doneSignal = new CountDownLatch(5);
// 啟動一個線程池去執行任務.這里是一個單線程(這里不關心有多少個線程去執行任務,這里只關心任務完成后計數遞減,使得主線程可以繼續執行)
ExecutorService executor = Executors.newSingleThreadExecutor();

// 向線程池提交5個任務
for (int i = 0; i < 5; i++) {
executor.execute(new WorkerTask(doneSignal, i));
}

// 主線程等待任務完成
doneSignal.await();

// 此時toString:[Count = 0]
System.out.println("主線程:問題全部解決.繼續:" + doneSignal.toString());
}

private static class Master {
private CountDownLatch startSignal;
private CountDownLatch endSignal;

public Master(int workerNum) {
startSignal = new CountDownLatch(1);
endSignal = new CountDownLatch(workerNum);

// 啟動所有worker線程
for (int i = 0; i < workerNum; i++) {
new Thread(new Worker(startSignal, endSignal)).start();
}
}

private void init() {
System.out.println("Master 初始化環境");
}

public void start() {
try {
init();

// 初始化完畢,則喚醒工作線程執行任務.
startSignal.countDown();

// 等待所有worker線程完成任務
endSignal.await();

dispose();

} catch (Exception e) {
}
}
private void dispose() {
System.out.println("Master 執行收尾操作");
}
}

private static class Worker implements Runnable {
private CountDownLatch startSignal;
private CountDownLatch endSignal;

public Worker(CountDownLatch startSignal, CountDownLatch endSignal) {
this.startSignal = startSignal;
this.endSignal = endSignal;
}

@Override
public void run() {
try {
// 等待Master線程初始化完畢
startSignal.await();

System.out.println("worker 執行任務");

// 表示任務完成,計數遞減,計數為0時,表示所有的任務完成
endSignal.countDown();

System.out.println("endSignal.counter:" + endSignal.getCount());
} catch (Exception e) {
}
}
}

private static class WorkerTask implements Runnable {
// 所有任務完成信號
private CountDownLatch doneSignal;
// 表示任務序號
private int i;

public WorkerTask(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}

@Override
public void run() {
try {
System.out.println("Worker[" + i + "]" + " 任務完成");
doneSignal.countDown();
} catch (Exception e) {
}
}
}
}

2.CylicBarrier demo
package com.landon.mavs.example.concurrent;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
*
* CyclicBarrier用法
*
* <pre>
* 1.同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)
* 2.粗淺的理解即有一道屏障,目的是等待一組線程完成操作.某一線程完成操作后,則等待在屏障下(await).直至所有線程均到了屏障下,
* 則可執行指定的屏障操作.待執行完執行的屏障操作后,所有的線程則結束await,即越過屏障,繼續執行后續操作.
* 3.該 barrier 在釋放等待線程后可以重用,所以稱它為循環 的 barrier
* </pre>
*
* @author landon
*
*/
public class CyclicBarrierExample {
public static void main(String[] args) throws Exception {
Master master = new Master();
master.start();
}
}

// 計算1²到10²和.分發到每個worker線程,最后合并
class Master {
// 用來保存計算結果
private static List<Integer> result = new ArrayList<>();

// public CyclicBarrier(int parties, Runnable barrierAction)
// 創建一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處于等待狀態時啟動,并在啟動 barrier 時執行給定的屏障操作
// 該操作由最后一個進入 barrier 的線程執行
private CyclicBarrier barrier = new CyclicBarrier(10, new Runnable() {

@Override
public void run() {
int sum = 0;
for (int tmp : result) {
sum += tmp;
}

// final result:285_Worker-9
// 從輸出看Worker-9執行了屏障操作.而Worker-9在線程的索引為0.即await的返回值.
// 執行完該操作后,所有的線程越過屏障,執行后續操作.
System.out.println("final result:" + sum + "_"
+ Thread.currentThread().getName());
}
});

public void start() throws Exception {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(new Worker(i, barrier), "Worker-" + i);
thread.start();
}
}
public static synchronized void addASum(int sum) {
result.add(sum);
}
}

class Worker implements Runnable {
private int i;
private CyclicBarrier barrier;

public Worker(int i, CyclicBarrier barrier) {
this.i = i;
this.barrier = barrier;
}

@Override
public void run() {
int sum = i * i;
Master.addASum(sum);

try {

// 模擬一下耗時
Thread.sleep(i * 100);

// public int getNumberWaiting()
// 返回當前在屏障處等待的參與者數目
System.out.println(Thread.currentThread().getName()
+ "_curNumberWaitting:" + barrier.getNumberWaiting());

// public int await() throws InterruptedException,
// BrokenBarrierException
// 在所有參與者都已經在此 barrier 上調用 await 方法之前,將一直等待
int curIndex = barrier.await();

System.out.println(Thread.currentThread().getName() + " end wait:"
+ curIndex);
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
}
}
}




















































































































































2.CylicBarrier demo



































































































