新的起點(diǎn) 新的開始

          快樂生活 !

          (轉(zhuǎn))Java 5.0多線程編程(4)

          CountDownLatch:

          ?? CountDownLatch 是個(gè)計(jì)數(shù)器,它有一個(gè)初始數(shù),等待這個(gè)計(jì)數(shù)器的線程必須等到計(jì)數(shù)器倒數(shù)到零時(shí)才可繼續(xù)。比如說一個(gè) Server 啟動(dòng)時(shí)需要初始化 4 個(gè)部件, Server 可以同時(shí)啟動(dòng) 4 個(gè)線程去初始化這 4 個(gè)部件,然后調(diào)用 CountDownLatch(4).await() 阻斷進(jìn)入等待,每個(gè)線程完成任務(wù)后會(huì)調(diào)用一次 CountDownLatch.countDown() 來倒計(jì)數(shù) , 當(dāng) 4 個(gè)線程都結(jié)束時(shí) CountDownLatch 的計(jì)數(shù)就會(huì)降低為 0 ,此時(shí) Server 就會(huì)被喚醒繼續(xù)下一步操作。 CountDownLatch 的方法主要有:

          • await() :使調(diào)用此方法的線程阻斷進(jìn)入等待
          • countDown(): 倒計(jì)數(shù),將計(jì)數(shù)值減 1
          • getCount(): 得到當(dāng)前的計(jì)數(shù)值

          ?? CountDownLatch 的例子:一個(gè) server 調(diào)了三個(gè) ComponentThread 分別去啟動(dòng)三個(gè)組件,然后 server 等到組件都啟動(dòng)了再繼續(xù)。

          public class Server {

          ????? public static void main(String[] args) throws InterruptedException{

          ??????????? System.out.println("Server is starting.");

          ?????????? ?// 初始化一個(gè)初始值為 3 CountDownLatch

          ??????????? CountDownLatch latch = new CountDownLatch(3);

          ??????????? // 3 個(gè)線程分別去啟動(dòng) 3 個(gè)組件

          ??????????? ExecutorService service = Executors.newCachedThreadPool();

          ??????????? service.submit(new ComponentThread(latch, 1));

          ????????? ??service.submit(new ComponentThread(latch, 2));

          ??????????? service.submit(new ComponentThread(latch, 3));

          ??????????? service.shutdown();

          ??????????? // 進(jìn)入等待狀態(tài)

          ??????????? latch.await();

          ??????????? // 當(dāng)所需的三個(gè)組件都完成時(shí), Server 就可繼續(xù)了

          ??????????? System.out.println("Server is up!");

          ????? }

          }

          ?

          public class ComponentThread implements Runnable{

          ????? CountDownLatch latch;

          ????? int ID;

          ????? /** Creates a new instance of ComponentThread */

          ????? public ComponentThread(CountDownLatch latch, int ID) {

          ??????????? this.latch = latch;

          ??????????? this.ID = ID;

          ????? }

          ????? public void run() {

          ??????????? System.out.println("Component "+ID + " initialized!");

          ??????????? // 將計(jì)數(shù)減一

          ??????????? latch.countDown();

          ????? }????

          }

          運(yùn)行結(jié)果:

          Server is starting.

          Component 1 initialized!

          Component 3 initialized!

          Component 2 initialized!

          Server is up!

          CyclicBarrier:

          ?? CyclicBarrier 類似于 CountDownLatch 也是個(gè)計(jì)數(shù)器,不同的是 CyclicBarrier 數(shù)的是調(diào)用了 CyclicBarrier.await() 進(jìn)入等待的線程數(shù),當(dāng)線程數(shù)達(dá)到了 CyclicBarrier 初始時(shí)規(guī)定的數(shù)目時(shí),所有進(jìn)入等待狀態(tài)的線程被喚醒并繼續(xù)。 CyclicBarrier 就象它名字的意思一樣,可看成是個(gè)障礙,所有的線程必須到齊后才能一起通過這個(gè)障礙。 CyclicBarrier 初始時(shí)還可帶一個(gè) Runnable 的參數(shù),此 Runnable 任務(wù)在 CyclicBarrier 的數(shù)目達(dá)到后,所有其它線程被喚醒前被執(zhí)行。

          CyclicBarrier 提供以下幾個(gè)方法:

          • await() :進(jìn)入等待
          • getParties() :返回此 barrier 需要的線程數(shù)
          • reset() :將此 barrier 重置

          ?? 以下是使用 CyclicBarrier 的一個(gè)例子:兩個(gè)線程分別在一個(gè)數(shù)組里放一個(gè)數(shù),當(dāng)這兩個(gè)線程都結(jié)束后,主線程算出數(shù)組里的數(shù)的和(這個(gè)例子比較無聊,我沒有想到更合適的例子)

          public class MainThread {

          public static void main(String[] args)

          ????? throws InterruptedException, BrokenBarrierException, TimeoutException{

          ??????????? final int[] array = new int[2];

          ??????????? CyclicBarrier barrier = new CyclicBarrier(2,

          ????????????????? new Runnable() {// 在所有線程都到達(dá) Barrier 時(shí)執(zhí)行

          ????????????????? public void run() {

          ??????????????????????? System.out.println("Total is:"+(array[0]+array[1]));

          ????????????????? }

          ??????????? });???????????

          ??????????? // 啟動(dòng)線程

          ??????????? new Thread(new ComponentThread(barrier, array, 0)).start();

          ??????????? new Thread(new ComponentThread(barrier, array, 1)).start();???

          ????? }?????

          }

          ?

          public class ComponentThread implements Runnable{

          ????? CyclicBarrier barrier;

          ????? int ID;

          ????? int[] array;

          ????? public ComponentThread(CyclicBarrier barrier, int[] array, int ID) {

          ??????????? this.barrier = barrier;

          ??????????? this.ID = ID;

          ??????????? this.array = array;

          ????? }

          ????? public void run() {

          ??????????? try {

          ????????????????? array[ID] = new Random().nextInt();

          ????????????????? System.out.println(ID+ " generates:"+array[ID]);

          ????????????????? // 該線程完成了任務(wù)等在 Barrier

          ????????????????? barrier.await();

          ??????????? } catch (BrokenBarrierException ex) {

          ????????????????? ex.printStackTrace();

          ??????????? } catch (InterruptedException ex) {

          ????????????????? ex.printStackTrace();

          ??????????? }

          ????? }

          }

          Exchanger:

          ?? 顧名思義 Exchanger 讓兩個(gè)線程可以互換信息。用一個(gè)例子來解釋比較容易。例子中服務(wù)生線程往空的杯子里倒水,顧客線程從裝滿水的杯子里喝水,然后通過 Exchanger 雙方互換杯子,服務(wù)生接著往空杯子里倒水,顧客接著喝水,然后交換,如此周而復(fù)始。

          class FillAndEmpty {

          ????? // 初始化一個(gè) Exchanger ,并規(guī)定可交換的信息類型是 DataCup

          ????? Exchanger exchanger = new Exchanger();

          ???? ?Cup initialEmptyCup = ...; // 初始化一個(gè)空的杯子

          ????? Cup initialFullCup = ...; // 初始化一個(gè)裝滿水的杯子

          ????? // 服務(wù)生線程

          ????? class Waiter implements Runnable {

          ??????????? public void run() {

          ????????????????? Cup currentCup = initialEmptyCup;

          ????????????????? try {

          ?????? ?????????????????// 往空的杯子里加水

          ??????????????????????? currentCup.addWater();

          ??????????????????????? // 杯子滿后和顧客的空杯子交換

          ??????????????????????? currentCup = exchanger.exchange(currentCup);

          ????????????????? } catch (InterruptedException ex) { ... handle ... }

          ? ?????????? }

          ????? }

          ????? // 顧客線程

          ????? class Customer implements Runnable {

          ??????????? public void run() {

          ????????????????? DataCup currentCup = initialFullCup;

          ????????????????? try {

          ??????????????????????? // 把杯子里的水喝掉

          ??????????????????????? currentCup.drinkFromCup();

          ??????????????????????? // 將空杯子和服務(wù)生的滿杯子交換

          ??????????????????????? currentCup = exchanger.exchange(currentCup);

          ????????????????? } catch (InterruptedException ex) { ... handle ...}

          ??????????? }

          ????? }

          ?????

          ????? void start() {

          ??????? ????new Thread(new Waiter()).start();

          ??????????? new Thread(new Customer()).start();

          ????? }

          }

          6: BlockingQueue接口

          ?? BlockingQueue 是一種特殊的 Queue ,若 BlockingQueue 是空的,從 BlockingQueue 取東西的操作將會(huì)被阻斷進(jìn)入等待狀態(tài)直到 BlocingkQueue 進(jìn)了新貨才會(huì)被喚醒。同樣,如果 BlockingQueue 是滿的任何試圖往里存東西的操作也會(huì)被阻斷進(jìn)入等待狀態(tài),直到 BlockingQueue 里有新的空間才會(huì)被喚醒繼續(xù)操作。 BlockingQueue 提供的方法主要有:

          • add(anObject): anObject 加到 BlockingQueue 里,如果 BlockingQueue 可以容納返回 true ,否則拋出 IllegalStateException 異常。
          • offer(anObject) :把 anObject 加到 BlockingQueue 里,如果 BlockingQueue 可以容納返回 true ,否則返回 false
          • put(anObject) :把 anObject 加到 BlockingQueue 里,如果 BlockingQueue 沒有空間,調(diào)用此方法的線程被阻斷直到 BlockingQueue 里有新的空間再繼續(xù)。
          • poll(time) :取出 BlockingQueue 里排在首位的對象,若不能立即取出可等 time 參數(shù)規(guī)定的時(shí)間。取不到時(shí)返回 null
          • take() :取出 BlockingQueue 里排在首位的對象,若 BlockingQueue 為空,阻斷進(jìn)入等待狀態(tài)直到 BlockingQueue 有新的對象被加入為止。

          根據(jù)不同的需要 BlockingQueue 4 種具體實(shí)現(xiàn):

          • ArrayBlockingQueue :規(guī)定大小的 BlockingQueue ,其構(gòu)造函數(shù)必須帶一個(gè) int 參數(shù)來指明其大小。其所含的對象是以 FIFO (先入先出)順序排序的。
          • LinkedBlockingQueue :大小不定的 BlockingQueue ,若其構(gòu)造函數(shù)帶一個(gè)規(guī)定大小的參數(shù),生成的 BlockingQueue 有大小限制,若不帶大小參數(shù),所生成的 BlockingQueue 的大小由 Integer.MAX_VALUE 來決定。其所含的對象是以 FIFO (先入先出)順序排序的。 LinkedBlockingQueue ArrayBlockingQueue 比較起來,它們背后所用的數(shù)據(jù)結(jié)構(gòu)不一樣,導(dǎo)致 LinkedBlockingQueue 的數(shù)據(jù)吞吐量要大于 ArrayBlockingQueue ,但在線程數(shù)量很大時(shí)其性能的可預(yù)見性低于 ArrayBlockingQueue
          • PriorityBlockingQueue :類似于 LinkedBlockingQueue ,但其所含對象的排序不是 FIFO ,而是依據(jù)對象的自然排序順序或者是構(gòu)造函數(shù)所帶的 Comparator 決定的順序。
          • SynchronousQueue :特殊的 BlockingQueue ,對其的操作必須是放和取交替完成的。

          下面是用 BlockingQueue 來實(shí)現(xiàn) Producer Consumer 的例子:

          public class BlockingQueueTest {

          ????? static BlockingQueue basket;

          ????? public BlockingQueueTest() {

          ??????????? // 定義了一個(gè)大小為 2 BlockingQueue ,也可根據(jù)需要用其他的具體類

          ??????????? basket = new ArrayBlockingQueue(2);

          ????? }

          ????? class Producor implements Runnable {

          ??????????? public void run() {

          ????????????????? while(true){

          ??????????????????????? try {

          ????????????????????????????? // 放入一個(gè)對象,若 basket 滿了,等到 basket 有位置

          ?????????????????????? ???????basket.put("An apple");

          ??????????????????????? } catch (InterruptedException ex) {

          ????????????????????????????? ex.printStackTrace();

          ??????????????????????? }

          ????????????????? }

          ??????????? }

          ????? }

          ????? class Consumer implements Runnable {

          ? ??????????public void run() {

          ????????????????? while(true){

          ??????????????????????? try {

          ????????????????????????????? // 取出一個(gè)對象,若 basket 為空,等到 basket 有東西為止

          ????????????????????????????? String result = basket.take();

          ??????????????????????? } catch (InterruptedException ex) {

          ????????????????????????????? ex.printStackTrace();

          ??????????????????????? }

          ????????????????? }

          ????? ??????}???????????

          ????? }

          ????? public void execute(){

          ??????????? for(int i=0; i<10; i++){

          ????????????????? new Thread(new Producor()).start();

          ????????????????? new Thread(new Consumer()).start();

          ??????????? }???????????

          ????? }

          ????? public static void main(String[] args){

          ??????????? BlockingQueueTest test = new BlockingQueueTest();

          ??????????? test.execute();

          ????? }?????

          }

          7Atomics 原子級變量

          ?? 原子量級的變量,主要的類有 AtomicBoolean, AtomicInteger, AotmicIntegerArray, AtomicLong, AtomicLongArray, AtomicReference …… 。這些原子量級的變量主要提供兩個(gè)方法:

          • compareAndSet(expectedValue, newValue): 比較當(dāng)前的值是否等于 expectedValue , 若等于把當(dāng)前值改成 newValue ,并返回 true 。若不等,返回 false
          • getAndSet(newValue): 把當(dāng)前值改為 newValue ,并返回改變前的值。

          ?? 這些原子級變量利用了現(xiàn)代處理器( CPU )的硬件支持可把兩步操作合為一步的功能,避免了不必要的鎖定,提高了程序的運(yùn)行效率。

          8Concurrent Collections 共點(diǎn)聚集

          ?? 在 Java 的聚集框架里可以調(diào)用 Collections.synchronizeCollection(aCollection) 將普通聚集改變成同步聚集,使之可用于多線程的環(huán)境下。 但同步聚集在一個(gè)時(shí)刻只允許一個(gè)線程訪問它,其它想同時(shí)訪問它的線程會(huì)被阻斷,導(dǎo)致程序運(yùn)行效率不高。 Java 5.0 里提供了幾個(gè)共點(diǎn)聚集類,它們把以前需要幾步才能完成的操作合成一個(gè)原子量級的操作,這樣就可讓多個(gè)線程同時(shí)對聚集進(jìn)行操作,避免了鎖定,從而提高了程序的運(yùn)行效率。 Java 5.0 目前提供的共點(diǎn)聚集類有: ConcurrentHashMap, ConcurrentLinkedQueue, CopyOnWriteArrayList CopyOnWriteArraySet.


          posted on 2007-03-26 14:32 advincenting 閱讀(539) 評論(0)  編輯  收藏


          只有注冊用戶登錄后才能發(fā)表評論。


          網(wǎng)站導(dǎo)航:
           

          公告

          Locations of visitors to this pageBlogJava
        1. 首頁
        2. 新隨筆
        3. 聯(lián)系
        4. 聚合
        5. 管理
        6. <2007年3月>
          25262728123
          45678910
          11121314151617
          18192021222324
          25262728293031
          1234567

          統(tǒng)計(jì)

          常用鏈接

          留言簿(13)

          隨筆分類(71)

          隨筆檔案(179)

          文章檔案(13)

          新聞分類

          IT人的英語學(xué)習(xí)網(wǎng)站

          JAVA站點(diǎn)

          優(yōu)秀個(gè)人博客鏈接

          官網(wǎng)學(xué)習(xí)站點(diǎn)

          生活工作站點(diǎn)

          最新隨筆

          搜索

          積分與排名

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 临沧市| 九寨沟县| 姜堰市| 阿拉善左旗| 高阳县| 宁陵县| 苍梧县| 莱芜市| 谷城县| 钦州市| 会东县| 丹寨县| 苍梧县| 平阴县| 伊宁市| 老河口市| 修水县| 堆龙德庆县| 通道| 四会市| 铁力市| 东城区| 松阳县| 灵川县| 富宁县| 龙游县| 云梦县| 盐津县| 加查县| 麻阳| 乌海市| 新建县| 阳新县| 黄石市| 东平县| 丹东市| 吉木乃县| 崇信县| 乐山市| 吉安市| 泸溪县|