(轉)Java 5.0多線程編程(4)
CountDownLatch:
?? CountDownLatch
是個計數器,它有一個初始數,等待這個計數器的線程必須等到計數器倒數到零時才可繼續。比如說一個
Server
啟動時需要初始化
4
個部件,
Server
可以同時啟動
4
個線程去初始化這
4
個部件,然后調用
CountDownLatch(4).await()
阻斷進入等待,每個線程完成任務后會調用一次
CountDownLatch.countDown()
來倒計數
,
當
4
個線程都結束時
CountDownLatch
的計數就會降低為
0
,此時
Server
就會被喚醒繼續下一步操作。
CountDownLatch
的方法主要有:
-
await()
:使調用此方法的線程阻斷進入等待
-
countDown():
倒計數,將計數值減
1
-
getCount():
得到當前的計數值
?? CountDownLatch
的例子:一個
server
調了三個
ComponentThread
分別去啟動三個組件,然后
server
等到組件都啟動了再繼續。
public class Server {
????? public static void main(String[] args) throws InterruptedException{
??????????? System.out.println("Server is starting.");
??????????
?//
初始化一個初始值為
3
的
CountDownLatch
??????????? CountDownLatch latch = new CountDownLatch(3);
??????????? //
起
3
個線程分別去啟動
3
個組件
??????????? ExecutorService service = Executors.newCachedThreadPool();
??????????? service.submit(new ComponentThread(latch, 1));
?????????
??service.submit(new ComponentThread(latch, 2));
??????????? service.submit(new ComponentThread(latch, 3));
??????????? service.shutdown();
??????????? //
進入等待狀態
??????????? latch.await();
??????????? //
當所需的三個組件都完成時,
Server
就可繼續了
??????????? System.out.println("Server is up!");
????? }
}
?
public class ComponentThread implements Runnable{
????? CountDownLatch latch;
????? int ID;
????? /** Creates a new instance of ComponentThread */
????? public ComponentThread(CountDownLatch latch, int ID) {
??????????? this.latch = latch;
??????????? this.ID = ID;
????? }
????? public void run() {
??????????? System.out.println("Component "+ID + " initialized!");
??????????? //
將計數減一
??????????? latch.countDown();
????? }????
} |
運行結果:
Server is starting.
Component 1 initialized!
Component 3 initialized!
Component 2 initialized!
Server is up!
|
CyclicBarrier:
?? CyclicBarrier
類似于
CountDownLatch
也是個計數器,不同的是
CyclicBarrier
數的是調用了
CyclicBarrier.await()
進入等待的線程數,當線程數達到了
CyclicBarrier
初始時規定的數目時,所有進入等待狀態的線程被喚醒并繼續。
CyclicBarrier
就象它名字的意思一樣,可看成是個障礙,所有的線程必須到齊后才能一起通過這個障礙。
CyclicBarrier
初始時還可帶一個
Runnable
的參數,此
Runnable
任務在
CyclicBarrier
的數目達到后,所有其它線程被喚醒前被執行。
CyclicBarrier
提供以下幾個方法:
-
await()
:進入等待
-
getParties()
:返回此
barrier
需要的線程數
-
reset()
:將此
barrier
重置
?? 以下是使用
CyclicBarrier
的一個例子:兩個線程分別在一個數組里放一個數,當這兩個線程都結束后,主線程算出數組里的數的和(這個例子比較無聊,我沒有想到更合適的例子)
public class MainThread {
public static void main(String[] args)
????? throws InterruptedException, BrokenBarrierException, TimeoutException{
??????????? final int[] array = new int[2];
??????????? CyclicBarrier barrier = new CyclicBarrier(2,
????????????????? new Runnable() {//
在所有線程都到達
Barrier
時執行
????????????????? public void run() {
??????????????????????? System.out.println("Total is:"+(array[0]+array[1]));
????????????????? }
??????????? });???????????
??????????? //
啟動線程
??????????? new Thread(new ComponentThread(barrier, array, 0)).start();
??????????? new Thread(new ComponentThread(barrier, array, 1)).start();???
????? }?????
}
?
public class ComponentThread implements Runnable{
????? CyclicBarrier barrier;
????? int ID;
????? int[] array;
????? public ComponentThread(CyclicBarrier barrier, int[] array, int ID) {
??????????? this.barrier = barrier;
??????????? this.ID = ID;
??????????? this.array = array;
????? }
????? public void run() {
??????????? try {
????????????????? array[ID] = new Random().nextInt();
????????????????? System.out.println(ID+ " generates:"+array[ID]);
????????????????? //
該線程完成了任務等在
Barrier
處
????????????????? barrier.await();
??????????? } catch (BrokenBarrierException ex) {
????????????????? ex.printStackTrace();
??????????? } catch (InterruptedException ex) {
????????????????? ex.printStackTrace();
??????????? }
????? }
} |
Exchanger:
?? 顧名思義
Exchanger
讓兩個線程可以互換信息。用一個例子來解釋比較容易。例子中服務生線程往空的杯子里倒水,顧客線程從裝滿水的杯子里喝水,然后通過
Exchanger
雙方互換杯子,服務生接著往空杯子里倒水,顧客接著喝水,然后交換,如此周而復始。
class FillAndEmpty {
????? //
初始化一個
Exchanger
,并規定可交換的信息類型是
DataCup
????? Exchanger
????
?Cup initialEmptyCup = ...; //
初始化一個空的杯子
????? Cup initialFullCup = ...; //
初始化一個裝滿水的杯子
????? //
服務生線程
????? class Waiter implements Runnable {
??????????? public void run() {
????????????????? Cup currentCup = initialEmptyCup;
????????????????? try {
??????
?????????????????//
往空的杯子里加水
??????????????????????? currentCup.addWater();
??????????????????????? //
杯子滿后和顧客的空杯子交換
??????????????????????? currentCup = exchanger.exchange(currentCup);
????????????????? } catch (InterruptedException ex) { ... handle ... }
?
?????????? }
????? }
????? //
顧客線程
????? class Customer implements Runnable {
??????????? public void run() {
????????????????? DataCup currentCup = initialFullCup;
????????????????? try {
??????????????????????? //
把杯子里的水喝掉
??????????????????????? currentCup.drinkFromCup();
??????????????????????? //
將空杯子和服務生的滿杯子交換
??????????????????????? currentCup = exchanger.exchange(currentCup);
????????????????? } catch (InterruptedException ex) { ... handle ...}
??????????? }
????? }
?????
????? void start() {
???????
????new Thread(new Waiter()).start();
??????????? new Thread(new Customer()).start();
????? }
} |
6: BlockingQueue接口
?? BlockingQueue
是一種特殊的
Queue
,若
BlockingQueue
是空的,從
BlockingQueue
取東西的操作將會被阻斷進入等待狀態直到
BlocingkQueue
進了新貨才會被喚醒。同樣,如果
BlockingQueue
是滿的任何試圖往里存東西的操作也會被阻斷進入等待狀態,直到
BlockingQueue
里有新的空間才會被喚醒繼續操作。
BlockingQueue
提供的方法主要有:
-
add(anObject):
把
anObject
加到
BlockingQueue
里,如果
BlockingQueue
可以容納返回
true
,否則拋出
IllegalStateException
異常。
-
offer(anObject)
:把
anObject
加到
BlockingQueue
里,如果
BlockingQueue
可以容納返回
true
,否則返回
false
。
-
put(anObject)
:把
anObject
加到
BlockingQueue
里,如果
BlockingQueue
沒有空間,調用此方法的線程被阻斷直到
BlockingQueue
里有新的空間再繼續。
-
poll(time)
:取出
BlockingQueue
里排在首位的對象,若不能立即取出可等
time
參數規定的時間。取不到時返回
null
。
-
take()
:取出
BlockingQueue
里排在首位的對象,若
BlockingQueue
為空,阻斷進入等待狀態直到
BlockingQueue
有新的對象被加入為止。
根據不同的需要
BlockingQueue
有
4
種具體實現:
-
ArrayBlockingQueue
:規定大小的
BlockingQueue
,其構造函數必須帶一個
int
參數來指明其大小。其所含的對象是以
FIFO
(先入先出)順序排序的。
-
LinkedBlockingQueue
:大小不定的
BlockingQueue
,若其構造函數帶一個規定大小的參數,生成的
BlockingQueue
有大小限制,若不帶大小參數,所生成的
BlockingQueue
的大小由
Integer.MAX_VALUE
來決定。其所含的對象是以
FIFO
(先入先出)順序排序的。
LinkedBlockingQueue
和
ArrayBlockingQueue
比較起來,它們背后所用的數據結構不一樣,導致
LinkedBlockingQueue
的數據吞吐量要大于
ArrayBlockingQueue
,但在線程數量很大時其性能的可預見性低于
ArrayBlockingQueue
。
-
PriorityBlockingQueue
:類似于
LinkedBlockingQueue
,但其所含對象的排序不是
FIFO
,而是依據對象的自然排序順序或者是構造函數所帶的
Comparator
決定的順序。
-
SynchronousQueue
:特殊的
BlockingQueue
,對其的操作必須是放和取交替完成的。
下面是用
BlockingQueue
來實現
Producer
和
Consumer
的例子:
public class BlockingQueueTest {
????? static BlockingQueue
????? public BlockingQueueTest() {
??????????? //
定義了一個大小為
2
的
BlockingQueue
,也可根據需要用其他的具體類
??????????? basket = new ArrayBlockingQueue
????? }
????? class Producor implements Runnable {
??????????? public void run() {
????????????????? while(true){
??????????????????????? try {
????????????????????????????? //
放入一個對象,若
basket
滿了,等到
basket
有位置
??????????????????????
???????basket.put("An apple");
??????????????????????? } catch (InterruptedException ex) {
????????????????????????????? ex.printStackTrace();
??????????????????????? }
????????????????? }
??????????? }
????? }
????? class Consumer implements Runnable {
?
??????????public void run() {
????????????????? while(true){
??????????????????????? try {
????????????????????????????? //
取出一個對象,若
basket
為空,等到
basket
有東西為止
????????????????????????????? String result = basket.take();
??????????????????????? } catch (InterruptedException ex) {
????????????????????????????? ex.printStackTrace();
??????????????????????? }
????????????????? }
?????
??????}???????????
????? }
????? public void execute(){
??????????? for(int i=0; i<10; i++){
????????????????? new Thread(new Producor()).start();
????????????????? new Thread(new Consumer()).start();
??????????? }???????????
????? }
????? public static void main(String[] args){
??????????? BlockingQueueTest test = new BlockingQueueTest();
??????????? test.execute();
????? }?????
} |
7:Atomics 原子級變量
?? 原子量級的變量,主要的類有
AtomicBoolean, AtomicInteger, AotmicIntegerArray, AtomicLong, AtomicLongArray, AtomicReference ……
。這些原子量級的變量主要提供兩個方法:
-
compareAndSet(expectedValue, newValue):
比較當前的值是否等于
expectedValue
,
若等于把當前值改成
newValue
,并返回
true
。若不等,返回
false
。
-
getAndSet(newValue):
把當前值改為
newValue
,并返回改變前的值。
?? 這些原子級變量利用了現代處理器(
CPU
)的硬件支持可把兩步操作合為一步的功能,避免了不必要的鎖定,提高了程序的運行效率。
8:Concurrent Collections 共點聚集
?? 在
Java
的聚集框架里可以調用
Collections.synchronizeCollection(aCollection)
將普通聚集改變成同步聚集,使之可用于多線程的環境下。
但同步聚集在一個時刻只允許一個線程訪問它,其它想同時訪問它的線程會被阻斷,導致程序運行效率不高。
Java 5.0
里提供了幾個共點聚集類,它們把以前需要幾步才能完成的操作合成一個原子量級的操作,這樣就可讓多個線程同時對聚集進行操作,避免了鎖定,從而提高了程序的運行效率。
Java 5.0
目前提供的共點聚集類有:
ConcurrentHashMap, ConcurrentLinkedQueue, CopyOnWriteArrayList
和
CopyOnWriteArraySet.
posted on 2007-03-26 14:32 advincenting 閱讀(538) 評論(0) 編輯 收藏