大鳥的學習樂園
          路漫漫其修遠兮,吾將上下而求索
          posts - 26,comments - 27,trackbacks - 0

          BlockingQueue
          支持兩個附加操作的 Queue,這兩個操作是:檢索元素時等待隊列變?yōu)榉强眨约按鎯υ貢r等待空間變得可用。

          BlockingQueue 不接受 null 元素。試圖 add、put 或 offer 一個 null 元素時,某些實現(xiàn)會拋出 NullPointerException。null 被用作指示 poll 操作失敗的警戒值。

          BlockingQueue 可以是限定容量的。它在任意給定時間都可以有一個 remainingCapacity,超出此容量,便無法無阻塞地 put 額外的元素。
          沒有任何內(nèi)部容量約束的 BlockingQueue 總是報告 Integer.MAX_VALUE 的剩余容量。

          BlockingQueue 實現(xiàn)主要用于生產(chǎn)者-使用者隊列,但它另外還支持 Collection 接口。因此,舉例來說,使用 remove(x) 從隊列中移除任意一個元素是有可能的。
          然而,這種操作通常不 會有效執(zhí)行,只能有計劃地偶爾使用,比如在取消排隊信息時。

          BlockingQueue 實現(xiàn)是線程安全的。所有排隊方法都可以使用內(nèi)部鎖定或其他形式的并發(fā)控制來自動達到它們的目的。
          然而,大量的 Collection 操作(addAll、containsAll、retainAll 和 removeAll)沒有 必要自動執(zhí)行,除非在實現(xiàn)中特別說明。
          因此,舉例來說,在只添加了 c 中的一些元素后,addAll(c) 有可能失敗(拋出一個異常)。

          BlockingQueue 實質(zhì)上不 支持使用任何一種“close”或“shutdown”操作來指示不再添加任何項。
          這種功能的需求和使用有依賴于實現(xiàn)的傾向。例如,一種常用的策略是:對于生產(chǎn)者,插入特殊的 end-of-stream 或 poison 對象,并根據(jù)使用者獲取這些對象的時間來對它們進行解釋。

          下面的例子演示了這個阻塞隊列的基本功能。

          import java.util.concurrent.BlockingQueue;
          import java.util.concurrent.ExecutorService;
          import java.util.concurrent.Executors;
          import java.util.concurrent.LinkedBlockingQueue;

          public class MyBlockingQueue extends Thread {
          public static BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);

          private int index;

          public MyBlockingQueue(int i) {
             this.index = i;
          }

          public void run() {
             try {
              queue.put(String.valueOf(this.index));
              System.out.println("{" + this.index + "} in queue!");
             } catch (Exception e) {
              e.printStackTrace();
             }
          }

          public static void main(String args[]) {
             ExecutorService service = Executors.newCachedThreadPool();
             for (int i = 0; i < 10; i++) {
              service.submit(new MyBlockingQueue(i));
             }
             Thread thread = new Thread() {
              public void run() {
               try {
                while (true) {
                 Thread.sleep((int) (Math.random() * 1000));
                 if(MyBlockingQueue.queue.isEmpty())
                  break;
                 String str = MyBlockingQueue.queue.take();
                 System.out.println(str + " has take!");
                }
               } catch (Exception e) {
                e.printStackTrace();
               }
              }
             };
             service.submit(thread);
             service.shutdown();
          }
          }
          ---------------------執(zhí)行結(jié)果-----------------
          {0} in queue!
          {1} in queue!
          {2} in queue!
          {3} in queue!
          0 has take!
          {4} in queue!
          1 has take!
          {6} in queue!
          2 has take!
          {7} in queue!
          3 has take!
          {8} in queue!
          4 has take!
          {5} in queue!
          6 has take!
          {9} in queue!
          7 has take!
          8 has take!
          5 has take!
          9 has take!

          -----------------------------------------


          CompletionService

          將生產(chǎn)新的異步任務與使用已完成任務的結(jié)果分離開來的服務。生產(chǎn)者 submit 執(zhí)行的任務。使用者 take 已完成的任務,
          并按照完成這些任務的順序處理它們的結(jié)果。例如,CompletionService 可以用來管理異步 IO ,執(zhí)行讀操作的任務作為程序或系統(tǒng)的一部分提交,
          然后,當完成讀操作時,會在程序的不同部分執(zhí)行其他操作,執(zhí)行操作的順序可能與所請求的順序不同。

          通常,CompletionService 依賴于一個單獨的 Executor 來實際執(zhí)行任務,在這種情況下,
          CompletionService 只管理一個內(nèi)部完成隊列。ExecutorCompletionService 類提供了此方法的一個實現(xiàn)。


          import java.util.concurrent.Callable;
          import java.util.concurrent.CompletionService;
          import java.util.concurrent.ExecutorCompletionService;
          import java.util.concurrent.ExecutorService;
          import java.util.concurrent.Executors;

          public class MyCompletionService implements Callable<String> {
          private int id;

          public MyCompletionService(int i){
             this.id=i;
          }
          public static void main(String[] args) throws Exception{
             ExecutorService service=Executors.newCachedThreadPool();
             CompletionService<String> completion=new ExecutorCompletionService<String>(service);
             for(int i=0;i<10;i++){
              completion.submit(new MyCompletionService(i));
             }
             for(int i=0;i<10;i++){
              System.out.println(completion.take().get());
             }
             service.shutdown();
          }
          public String call() throws Exception {
             Integer time=(int)(Math.random()*1000);
             try{
              System.out.println(this.id+" start");
              Thread.sleep(time);
              System.out.println(this.id+" end");
             }
             catch(Exception e){
              e.printStackTrace();
             }
             return this.id+":"+time;
          }
          }


          CountDownLatch


          一個同步輔助類,在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個或多個線程一直等待。

          用給定的計數(shù) 初始化 CountDownLatch。由于調(diào)用了 countDown() 方法,所以在當前計數(shù)到達零之前,await 方法會一直受阻塞。
          之后,會釋放所有等待的線程,await 的所有后續(xù)調(diào)用都將立即返回。這種現(xiàn)象只出現(xiàn)一次——計數(shù)無法被重置。如果需要重置計數(shù),請考慮使用 CyclicBarrier。

          CountDownLatch 是一個通用同步工具,它有很多用途。將計數(shù) 1 初始化的 CountDownLatch 用作一個簡單的開/關(guān)鎖存器,
          或入口:在通過調(diào)用 countDown() 的線程打開入口前,所有調(diào)用 await 的線程都一直在入口處等待。
          用 N 初始化的 CountDownLatch 可以使一個線程在 N 個線程完成某項操作之前一直等待,或者使其在某項操作完成 N 次之前一直等待。

          CountDownLatch 的一個有用特性是,它不要求調(diào)用 countDown 方法的線程等到計數(shù)到達零時才繼續(xù),
          而在所有線程都能通過之前,它只是阻止任何線程繼續(xù)通過一個 await。
          一下的例子是別人寫的,非常形象。

          import java.util.concurrent.CountDownLatch;
          import java.util.concurrent.ExecutorService;
          import java.util.concurrent.Executors;

          public class TestCountDownLatch {
          public static void main(String[] args) throws InterruptedException {
             // 開始的倒數(shù)鎖
             final CountDownLatch begin = new CountDownLatch(1);
             // 結(jié)束的倒數(shù)鎖
             final CountDownLatch end = new CountDownLatch(10);
             // 十名選手
             final ExecutorService exec = Executors.newFixedThreadPool(10);
            
             for (int index = 0; index < 10; index++) {
              final int NO = index + 1;
              Runnable run = new Runnable() {
               public void run() {
                try {
                 begin.await();//一直阻塞
                 Thread.sleep((long) (Math.random() * 10000));
                 System.out.println("No." + NO + " arrived");
                } catch (InterruptedException e) {
                } finally {
                 end.countDown();
                }
               }
              };
              exec.submit(run);
             }
             System.out.println("Game Start");
             begin.countDown();
             end.await();
             System.out.println("Game Over");
             exec.shutdown();
          }
          }
          CountDownLatch最重要的方法是countDown()和await(),前者主要是倒數(shù)一次,后者是等待倒數(shù)到0,如果沒有到達0,就只有阻塞等待了。


          CyclicBarrier

          一個同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。
          在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 很有用。因為該 barrier 在釋放等待線程后可以重用,所以稱它為循環(huán) 的 barrier。

          CyclicBarrier 支持一個可選的 Runnable 命令,在一組線程中的最后一個線程到達之后(但在釋放所有線程之前),
          該命令只在每個屏障點運行一次。若在繼續(xù)所有參與線程之前更新共享狀態(tài),此屏障操作 很有用。

          示例用法:下面是一個在并行分解設(shè)計中使用 barrier 的例子,很經(jīng)典的旅行團例子:
          import java.text.SimpleDateFormat;
          import java.util.Date;
          import java.util.concurrent.BrokenBarrierException;
          import java.util.concurrent.CyclicBarrier;
          import java.util.concurrent.ExecutorService;
          import java.util.concurrent.Executors;
          public class TestCyclicBarrier {
          // 徒步需要的時間: Shenzhen, Guangzhou, Shaoguan, Changsha, Wuhan
          private static int[] timeWalk = { 5, 8, 15, 15, 10 };
          // 自駕游
          private static int[] timeSelf = { 1, 3, 4, 4, 5 };
          // 旅游大巴
          private static int[] timeBus = { 2, 4, 6, 6, 7 };

          static String now() {
              SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
              return sdf.format(new Date()) + ": ";
          }

          static class Tour implements Runnable {
              private int[] times;
              private CyclicBarrier barrier;
              private String tourName;
              public Tour(CyclicBarrier barrier, String tourName, int[] times) {
                this.times = times;
                this.tourName = tourName;
                this.barrier = barrier;
              }
              public void run() {
                try {
                  Thread.sleep(times[0] * 1000);
                  System.out.println(now() + tourName + " Reached Shenzhen");
                  barrier.await();
                  Thread.sleep(times[1] * 1000);
                  System.out.println(now() + tourName + " Reached Guangzhou");
                  barrier.await();
                  Thread.sleep(times[2] * 1000);
                  System.out.println(now() + tourName + " Reached Shaoguan");
                  barrier.await();
                  Thread.sleep(times[3] * 1000);
                  System.out.println(now() + tourName + " Reached Changsha");
                  barrier.await();
                  Thread.sleep(times[4] * 1000);
                  System.out.println(now() + tourName + " Reached Wuhan");
                  barrier.await();
                } catch (InterruptedException e) {
                } catch (BrokenBarrierException e) {
                }
              }
          }

          public static void main(String[] args) {
              // 三個旅行團
              CyclicBarrier barrier = new CyclicBarrier(3);
              ExecutorService exec = Executors.newFixedThreadPool(3);
              exec.submit(new Tour(barrier, "WalkTour", timeWalk));
              exec.submit(new Tour(barrier, "SelfTour", timeSelf));
          //當我們把下面的這段代碼注釋后,會發(fā)現(xiàn),程序阻塞了,無法繼續(xù)運行下去。
              exec.submit(new Tour(barrier, "BusTour", timeBus));
              exec.shutdown();
          }
          }

          CyclicBarrier最重要的屬性就是參與者個數(shù),另外最要方法是await()。當所有線程都調(diào)用了await()后,就表示這些線程都可以繼續(xù)執(zhí)行,否則就會等待。

          Future

          Future 表示異步計算的結(jié)果。它提供了檢查計算是否完成的方法,以等待計算的完成,并檢索計算的結(jié)果。
          計算完成后只能使用 get 方法來檢索結(jié)果,如有必要,計算完成前可以阻塞此方法。取消則由 cancel 方法來執(zhí)行。
          還提供了其他方法,以確定任務是正常完成還是被取消了。一旦計算完成,就不能再取消計算。
          如果為了可取消性而使用 Future但又不提供可用的結(jié)果,則可以聲明 Future<?> 形式類型、并返回 null 作為基礎(chǔ)任務的結(jié)果。

          這個我們在前面CompletionService已經(jīng)看到了,這個Future的功能,而且這個可以在提交線程的時候被指定為一個返回對象的。


          ScheduledExecutorService

          一個 ExecutorService,可安排在給定的延遲后運行或定期執(zhí)行的命令。

          schedule 方法使用各種延遲創(chuàng)建任務,并返回一個可用于取消或檢查執(zhí)行的任務對象。scheduleAtFixedRate 和 scheduleWithFixedDelay 方法創(chuàng)建并執(zhí)行某些在取消前一直定期運行的任務。

          用 Executor.execute(java.lang.Runnable) 和 ExecutorService 的 submit 方法所提交的命令,通過所請求的 0 延遲進行安排。
          schedule 方法中允許出現(xiàn) 0 和負數(shù)延遲(但不是周期),并將這些視為一種立即執(zhí)行的請求。

          所有的 schedule 方法都接受相對 延遲和周期作為參數(shù),而不是絕對的時間或日期。將以 Date 所表示的絕對時間轉(zhuǎn)換成要求的形式很容易。
          例如,要安排在某個以后的日期運行,可以使用:schedule(task, date.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS)。
          但是要注意,由于網(wǎng)絡時間同步協(xié)議、時鐘漂移或其他因素的存在,因此相對延遲的期滿日期不必與啟用任務的當前 Date 相符。
          Executors 類為此包中所提供的 ScheduledExecutorService 實現(xiàn)提供了便捷的工廠方法。

          一下的例子也是網(wǎng)上比較流行的。

          import static java.util.concurrent.TimeUnit.SECONDS;
          import java.util.Date;
          import java.util.concurrent.Executors;
          import java.util.concurrent.ScheduledExecutorService;
          import java.util.concurrent.ScheduledFuture;

          public class TestScheduledThread {
          public static void main(String[] args) {
             final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
             final Runnable beeper = new Runnable() {
              int count = 0;

              public void run() {
               System.out.println(new Date() + " beep " + (++count));
              }
             };
             // 1秒鐘后運行,并每隔2秒運行一次
             final ScheduledFuture beeperHandle = scheduler.scheduleAtFixedRate(beeper, 1, 2, SECONDS);
             // 2秒鐘后運行,并每次在上次任務運行完后等待5秒后重新運行
             final ScheduledFuture beeperHandle2 = scheduler.scheduleWithFixedDelay(beeper, 2, 5, SECONDS);
             // 30秒后結(jié)束關(guān)閉任務,并且關(guān)閉Scheduler
             scheduler.schedule(new Runnable() {
              public void run() {
               beeperHandle.cancel(true);
               beeperHandle2.cancel(true);
               scheduler.shutdown();
              }
             }, 30, SECONDS);
          }
          }

          這樣我們就把concurrent包下比較重要的功能都已經(jīng)總結(jié)完了,希望對我們理解能有幫助。

          posted on 2009-09-14 17:07 大鳥 閱讀(289) 評論(0)  編輯  收藏 所屬分類: JAVA
          主站蜘蛛池模板: 德江县| 扶绥县| 洛阳市| 长汀县| 霞浦县| 额尔古纳市| 筠连县| 来安县| 元谋县| 达拉特旗| 天津市| 芜湖市| 凤台县| 连州市| 黑龙江省| 辽阳县| 麻阳| 禄丰县| 招远市| 山西省| 平泉县| 托克托县| 安泽县| 湄潭县| 德州市| 塘沽区| 龙口市| 丰镇市| 长泰县| 富宁县| 肥西县| 昂仁县| 长宁区| 普宁市| 镇雄县| 霍城县| 龙泉市| 高安市| 吉安县| 宝丰县| 博爱县|