新的起點 新的開始

          快樂生活 !

          (轉)Java 5.0多線程編程(4)

          CountDownLatch:

          ?? CountDownLatch 是個計數器,它有一個初始數,等待這個計數器的線程必須等到計數器倒數到零時才可繼續。比如說一個 Server 啟動時需要初始化 4 個部件, Server 可以同時啟動 4 個線程去初始化這 4 個部件,然后調用 CountDownLatch(4).await() 阻斷進入等待,每個線程完成任務后會調用一次 CountDownLatch.countDown() 來倒計數 , 4 個線程都結束時 CountDownLatch 的計數就會降低為 0 ,此時 Server 就會被喚醒繼續下一步操作。 CountDownLatch 的方法主要有:

          • await() :使調用此方法的線程阻斷進入等待
          • countDown(): 倒計數,將計數值減 1
          • getCount(): 得到當前的計數值

          ?? CountDownLatch 的例子:一個 server 調了三個 ComponentThread 分別去啟動三個組件,然后 server 等到組件都啟動了再繼續。

          public class Server {

          ????? public static void main(String[] args) throws InterruptedException{

          ??????????? System.out.println("Server is starting.");

          ?????????? ?// 初始化一個初始值為 3 CountDownLatch

          ??????????? CountDownLatch latch = new CountDownLatch(3);

          ??????????? // 3 個線程分別去啟動 3 個組件

          ??????????? ExecutorService service = Executors.newCachedThreadPool();

          ??????????? service.submit(new ComponentThread(latch, 1));

          ????????? ??service.submit(new ComponentThread(latch, 2));

          ??????????? service.submit(new ComponentThread(latch, 3));

          ??????????? service.shutdown();

          ??????????? // 進入等待狀態

          ??????????? latch.await();

          ??????????? // 當所需的三個組件都完成時, Server 就可繼續了

          ??????????? System.out.println("Server is up!");

          ????? }

          }

          ?

          public class ComponentThread implements Runnable{

          ????? CountDownLatch latch;

          ????? int ID;

          ????? /** Creates a new instance of ComponentThread */

          ????? public ComponentThread(CountDownLatch latch, int ID) {

          ??????????? this.latch = latch;

          ??????????? this.ID = ID;

          ????? }

          ????? public void run() {

          ??????????? System.out.println("Component "+ID + " initialized!");

          ??????????? // 將計數減一

          ??????????? latch.countDown();

          ????? }????

          }

          運行結果:

          Server is starting.

          Component 1 initialized!

          Component 3 initialized!

          Component 2 initialized!

          Server is up!

          CyclicBarrier:

          ?? CyclicBarrier 類似于 CountDownLatch 也是個計數器,不同的是 CyclicBarrier 數的是調用了 CyclicBarrier.await() 進入等待的線程數,當線程數達到了 CyclicBarrier 初始時規定的數目時,所有進入等待狀態的線程被喚醒并繼續。 CyclicBarrier 就象它名字的意思一樣,可看成是個障礙,所有的線程必須到齊后才能一起通過這個障礙。 CyclicBarrier 初始時還可帶一個 Runnable 的參數,此 Runnable 任務在 CyclicBarrier 的數目達到后,所有其它線程被喚醒前被執行。

          CyclicBarrier 提供以下幾個方法:

          • await() :進入等待
          • getParties() :返回此 barrier 需要的線程數
          • reset() :將此 barrier 重置

          ?? 以下是使用 CyclicBarrier 的一個例子:兩個線程分別在一個數組里放一個數,當這兩個線程都結束后,主線程算出數組里的數的和(這個例子比較無聊,我沒有想到更合適的例子)

          public class MainThread {

          public static void main(String[] args)

          ????? throws InterruptedException, BrokenBarrierException, TimeoutException{

          ??????????? final int[] array = new int[2];

          ??????????? CyclicBarrier barrier = new CyclicBarrier(2,

          ????????????????? new Runnable() {// 在所有線程都到達 Barrier 時執行

          ????????????????? public void run() {

          ??????????????????????? System.out.println("Total is:"+(array[0]+array[1]));

          ????????????????? }

          ??????????? });???????????

          ??????????? // 啟動線程

          ??????????? new Thread(new ComponentThread(barrier, array, 0)).start();

          ??????????? new Thread(new ComponentThread(barrier, array, 1)).start();???

          ????? }?????

          }

          ?

          public class ComponentThread implements Runnable{

          ????? CyclicBarrier barrier;

          ????? int ID;

          ????? int[] array;

          ????? public ComponentThread(CyclicBarrier barrier, int[] array, int ID) {

          ??????????? this.barrier = barrier;

          ??????????? this.ID = ID;

          ??????????? this.array = array;

          ????? }

          ????? public void run() {

          ??????????? try {

          ????????????????? array[ID] = new Random().nextInt();

          ????????????????? System.out.println(ID+ " generates:"+array[ID]);

          ????????????????? // 該線程完成了任務等在 Barrier

          ????????????????? barrier.await();

          ??????????? } catch (BrokenBarrierException ex) {

          ????????????????? ex.printStackTrace();

          ??????????? } catch (InterruptedException ex) {

          ????????????????? ex.printStackTrace();

          ??????????? }

          ????? }

          }

          Exchanger:

          ?? 顧名思義 Exchanger 讓兩個線程可以互換信息。用一個例子來解釋比較容易。例子中服務生線程往空的杯子里倒水,顧客線程從裝滿水的杯子里喝水,然后通過 Exchanger 雙方互換杯子,服務生接著往空杯子里倒水,顧客接著喝水,然后交換,如此周而復始。

          class FillAndEmpty {

          ????? // 初始化一個 Exchanger ,并規定可交換的信息類型是 DataCup

          ????? Exchanger exchanger = new Exchanger();

          ???? ?Cup initialEmptyCup = ...; // 初始化一個空的杯子

          ????? Cup initialFullCup = ...; // 初始化一個裝滿水的杯子

          ????? // 服務生線程

          ????? class Waiter implements Runnable {

          ??????????? public void run() {

          ????????????????? Cup currentCup = initialEmptyCup;

          ????????????????? try {

          ?????? ?????????????????// 往空的杯子里加水

          ??????????????????????? currentCup.addWater();

          ??????????????????????? // 杯子滿后和顧客的空杯子交換

          ??????????????????????? currentCup = exchanger.exchange(currentCup);

          ????????????????? } catch (InterruptedException ex) { ... handle ... }

          ? ?????????? }

          ????? }

          ????? // 顧客線程

          ????? class Customer implements Runnable {

          ??????????? public void run() {

          ????????????????? DataCup currentCup = initialFullCup;

          ????????????????? try {

          ??????????????????????? // 把杯子里的水喝掉

          ??????????????????????? currentCup.drinkFromCup();

          ??????????????????????? // 將空杯子和服務生的滿杯子交換

          ??????????????????????? currentCup = exchanger.exchange(currentCup);

          ????????????????? } catch (InterruptedException ex) { ... handle ...}

          ??????????? }

          ????? }

          ?????

          ????? void start() {

          ??????? ????new Thread(new Waiter()).start();

          ??????????? new Thread(new Customer()).start();

          ????? }

          }

          6: BlockingQueue接口

          ?? BlockingQueue 是一種特殊的 Queue ,若 BlockingQueue 是空的,從 BlockingQueue 取東西的操作將會被阻斷進入等待狀態直到 BlocingkQueue 進了新貨才會被喚醒。同樣,如果 BlockingQueue 是滿的任何試圖往里存東西的操作也會被阻斷進入等待狀態,直到 BlockingQueue 里有新的空間才會被喚醒繼續操作。 BlockingQueue 提供的方法主要有:

          • add(anObject): anObject 加到 BlockingQueue 里,如果 BlockingQueue 可以容納返回 true ,否則拋出 IllegalStateException 異常。
          • offer(anObject) :把 anObject 加到 BlockingQueue 里,如果 BlockingQueue 可以容納返回 true ,否則返回 false
          • put(anObject) :把 anObject 加到 BlockingQueue 里,如果 BlockingQueue 沒有空間,調用此方法的線程被阻斷直到 BlockingQueue 里有新的空間再繼續。
          • poll(time) :取出 BlockingQueue 里排在首位的對象,若不能立即取出可等 time 參數規定的時間。取不到時返回 null
          • take() :取出 BlockingQueue 里排在首位的對象,若 BlockingQueue 為空,阻斷進入等待狀態直到 BlockingQueue 有新的對象被加入為止。

          根據不同的需要 BlockingQueue 4 種具體實現:

          • ArrayBlockingQueue :規定大小的 BlockingQueue ,其構造函數必須帶一個 int 參數來指明其大小。其所含的對象是以 FIFO (先入先出)順序排序的。
          • LinkedBlockingQueue :大小不定的 BlockingQueue ,若其構造函數帶一個規定大小的參數,生成的 BlockingQueue 有大小限制,若不帶大小參數,所生成的 BlockingQueue 的大小由 Integer.MAX_VALUE 來決定。其所含的對象是以 FIFO (先入先出)順序排序的。 LinkedBlockingQueue ArrayBlockingQueue 比較起來,它們背后所用的數據結構不一樣,導致 LinkedBlockingQueue 的數據吞吐量要大于 ArrayBlockingQueue ,但在線程數量很大時其性能的可預見性低于 ArrayBlockingQueue
          • PriorityBlockingQueue :類似于 LinkedBlockingQueue ,但其所含對象的排序不是 FIFO ,而是依據對象的自然排序順序或者是構造函數所帶的 Comparator 決定的順序。
          • SynchronousQueue :特殊的 BlockingQueue ,對其的操作必須是放和取交替完成的。

          下面是用 BlockingQueue 來實現 Producer Consumer 的例子:

          public class BlockingQueueTest {

          ????? static BlockingQueue basket;

          ????? public BlockingQueueTest() {

          ??????????? // 定義了一個大小為 2 BlockingQueue ,也可根據需要用其他的具體類

          ??????????? basket = new ArrayBlockingQueue(2);

          ????? }

          ????? class Producor implements Runnable {

          ??????????? public void run() {

          ????????????????? while(true){

          ??????????????????????? try {

          ????????????????????????????? // 放入一個對象,若 basket 滿了,等到 basket 有位置

          ?????????????????????? ???????basket.put("An apple");

          ??????????????????????? } catch (InterruptedException ex) {

          ????????????????????????????? ex.printStackTrace();

          ??????????????????????? }

          ????????????????? }

          ??????????? }

          ????? }

          ????? class Consumer implements Runnable {

          ? ??????????public void run() {

          ????????????????? while(true){

          ??????????????????????? try {

          ????????????????????????????? // 取出一個對象,若 basket 為空,等到 basket 有東西為止

          ????????????????????????????? String result = basket.take();

          ??????????????????????? } catch (InterruptedException ex) {

          ????????????????????????????? ex.printStackTrace();

          ??????????????????????? }

          ????????????????? }

          ????? ??????}???????????

          ????? }

          ????? public void execute(){

          ??????????? for(int i=0; i<10; i++){

          ????????????????? new Thread(new Producor()).start();

          ????????????????? new Thread(new Consumer()).start();

          ??????????? }???????????

          ????? }

          ????? public static void main(String[] args){

          ??????????? BlockingQueueTest test = new BlockingQueueTest();

          ??????????? test.execute();

          ????? }?????

          }

          7Atomics 原子級變量

          ?? 原子量級的變量,主要的類有 AtomicBoolean, AtomicInteger, AotmicIntegerArray, AtomicLong, AtomicLongArray, AtomicReference …… 。這些原子量級的變量主要提供兩個方法:

          • compareAndSet(expectedValue, newValue): 比較當前的值是否等于 expectedValue , 若等于把當前值改成 newValue ,并返回 true 。若不等,返回 false
          • getAndSet(newValue): 把當前值改為 newValue ,并返回改變前的值。

          ?? 這些原子級變量利用了現代處理器( CPU )的硬件支持可把兩步操作合為一步的功能,避免了不必要的鎖定,提高了程序的運行效率。

          8Concurrent Collections 共點聚集

          ?? 在 Java 的聚集框架里可以調用 Collections.synchronizeCollection(aCollection) 將普通聚集改變成同步聚集,使之可用于多線程的環境下。 但同步聚集在一個時刻只允許一個線程訪問它,其它想同時訪問它的線程會被阻斷,導致程序運行效率不高。 Java 5.0 里提供了幾個共點聚集類,它們把以前需要幾步才能完成的操作合成一個原子量級的操作,這樣就可讓多個線程同時對聚集進行操作,避免了鎖定,從而提高了程序的運行效率。 Java 5.0 目前提供的共點聚集類有: ConcurrentHashMap, ConcurrentLinkedQueue, CopyOnWriteArrayList CopyOnWriteArraySet.


          posted on 2007-03-26 14:32 advincenting 閱讀(538) 評論(0)  編輯  收藏


          只有注冊用戶登錄后才能發表評論。


          網站導航:
           

          公告

          Locations of visitors to this pageBlogJava
        1. 首頁
        2. 新隨筆
        3. 聯系
        4. 聚合
        5. 管理
        6. <2007年3月>
          25262728123
          45678910
          11121314151617
          18192021222324
          25262728293031
          1234567

          統計

          常用鏈接

          留言簿(13)

          隨筆分類(71)

          隨筆檔案(179)

          文章檔案(13)

          新聞分類

          IT人的英語學習網站

          JAVA站點

          優秀個人博客鏈接

          官網學習站點

          生活工作站點

          最新隨筆

          搜索

          積分與排名

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 石狮市| 枞阳县| 东乡族自治县| 桦甸市| 藁城市| 南部县| 枣阳市| 罗田县| 黄龙县| 拉萨市| 河池市| 海伦市| 青海省| 天等县| 庆安县| 鄂州市| 塘沽区| 偃师市| 吐鲁番市| 万载县| 从江县| 海淀区| 济南市| 册亨县| 新河县| 灯塔市| 新丰县| 工布江达县| 淮安市| 嘉兴市| 莲花县| 东明县| 宿迁市| 阿鲁科尔沁旗| 德州市| 抚远县| 鄂伦春自治旗| 东港市| 布尔津县| 潞西市| 福州市|