linugb118--java space

          Java

          關(guān)于 java.util.concurrent 您不知道的 5 件事

          Concurrent Collections 是 Java™ 5 的巨大附加產(chǎn)品,但是在關(guān)于注釋和泛型的爭執(zhí)中很多 Java 開發(fā)人員忽視了它們。此外(或者更老實地說),許多開發(fā)人員避免使用這個數(shù)據(jù)包,因為他們認為它一定很復(fù)雜,就像它所要解決的問題一樣。

          事實上,java.util.concurrent 包含許多類,能夠有效解決普通的并發(fā)問題,無需復(fù)雜工序。閱讀本文,了解 java.util.concurrent 類,比如 CopyOnWriteArrayList  BlockingQueue 如何幫助您解決多線程編程的棘手問題。

          1. TimeUnit

          盡管本質(zhì)上 不是 Collections 類,但 java.util.concurrent.TimeUnit 枚舉讓代碼更易讀懂。使用 TimeUnit 將使用您的方法或 API 的開發(fā)人員從毫秒的 “暴政” 中解放出來。

          TimeUnit 包括所有時間單位,從 MILLISECONDS  MICROSECONDS  DAYS  HOURS,這就意味著它能夠處理一個開發(fā)人員所需的幾乎所有的時間范圍類型。同時,因為在列舉上聲明了轉(zhuǎn)換方法,在時間加快時,將 HOURS 轉(zhuǎn)換回 MILLISECONDS 甚至變得更容易。

          2. CopyOnWriteArrayList

          創(chuàng)建數(shù)組的全新副本是過于昂貴的操作,無論是從時間上,還是從內(nèi)存開銷上,因此在通常使用中很少考慮;開發(fā)人員往往求助于使用同步的 ArrayList。然而,這也是一個成本較高的選擇,因為每當您跨集合內(nèi)容進行迭代時,您就不得不同步所有操作,包括讀和寫,以此保證一致性。

          這又讓成本結(jié)構(gòu)回到這樣一個場景:需多讀者都在讀取 ArrayList,但是幾乎沒人會去修改它。

          CopyOnWriteArrayList 是個巧妙的小寶貝,能解決這一問題。它的 Javadoc 將 CopyOnWriteArrayList 定義為一個 “ArrayList 的線程安全變體,在這個變體中所有易變操作(添加,設(shè)置等)可以通過復(fù)制全新的數(shù)組來實現(xiàn)”。

          集合從內(nèi)部將它的內(nèi)容復(fù)制到一個沒有修改的新數(shù)組,這樣讀者訪問數(shù)組內(nèi)容時就不會產(chǎn)生同步成本(因為他們從來不是在易變數(shù)據(jù)上操作)。

          本質(zhì)上講,CopyOnWriteArrayList 很適合處理 ArrayList 經(jīng)常讓我們失敗的這種場景:讀取頻繁,但很少有寫操作的集合,例如 JavaBean 事件的 Listeners。

          3. BlockingQueue

          BlockingQueue 接口表示它是一個 Queue,意思是它的項以先入先出(FIFO)順序存儲。在特定順序插入的項以相同的順序檢索 — 但是需要附加保證,從空隊列檢索一個項的任何嘗試都會阻塞調(diào)用線程,直到這個項準備好被檢索。同理,想要將一個項插入到滿隊列的嘗試也會導(dǎo)致阻塞調(diào)用線程,直到隊列的存儲空間可用。

          BlockingQueue 干凈利落地解決了如何將一個線程收集的項“傳遞”給另一線程用于處理的問題,無需考慮同步問題。Java Tutorial 的 Guarded Blocks 試用版就是一個很好的例子。它構(gòu)建一個單插槽綁定的緩存,當新的項可用,而且插槽也準備好接受新的項時,使用手動同步和 wait()/notifyAll() 在線程之間發(fā)信。(詳見 Guarded Blocks 實現(xiàn)。)

          盡管 Guarded Blocks 教程中的代碼有效,但是它耗時久,混亂,而且也并非完全直觀。退回到 Java 平臺較早的時候,沒錯,Java 開發(fā)人員不得不糾纏于這種代碼;但現(xiàn)在是 2010 年 — 情況難道沒有改善?

          清單 1 顯示了 Guarded Blocks 代碼的重寫版,其中我使用了一個 ArrayBlockingQueue,而不是手寫的 Drop


          清單 1. BlockingQueue
                      import java.util.*;
                      import java.util.concurrent.*;
                      class Producer
                      implements Runnable
                      {
                      private BlockingQueue<String> drop;
                      List<String> messages = Arrays.asList(
                      "Mares eat oats",
                      "Does eat oats",
                      "Little lambs eat ivy",
                      "Wouldn't you eat ivy too?");
                      public Producer(BlockingQueue<String> d) { this.drop = d; }
                      public void run()
                      {
                      try
                      {
                      for (String s : messages)
                      drop.put(s);
                      drop.put("DONE");
                      }
                      catch (InterruptedException intEx)
                      {
                      System.out.println("Interrupted! " +
                      "Last one out, turn out the lights!");
                      }
                      }
                      }
                      class Consumer
                      implements Runnable
                      {
                      private BlockingQueue<String> drop;
                      public Consumer(BlockingQueue<String> d) { this.drop = d; }
                      public void run()
                      {
                      try
                      {
                      String msg = null;
                      while (!((msg = drop.take()).equals("DONE")))
                      System.out.println(msg);
                      }
                      catch (InterruptedException intEx)
                      {
                      System.out.println("Interrupted! " +
                      "Last one out, turn out the lights!");
                      }
                      }
                      }
                      public class ABQApp
                      {
                      public static void main(String[] args)
                      {
                      BlockingQueue<String> drop = new ArrayBlockingQueue(1, true);
                      (new Thread(new Producer(drop))).start();
                      (new Thread(new Consumer(drop))).start();
                      }
                      }

          ArrayBlockingQueue 還體現(xiàn)了“公平” — 意思是它為讀取器和編寫器提供線程先入先出訪問。這種替代方法是一個更有效,但又冒窮盡部分線程風(fēng)險的政策。(即,允許一些讀取器在其他讀取器鎖定時運行效率更高,但是您可能會有讀取器線程的流持續(xù)不斷的風(fēng)險,導(dǎo)致編寫器無法進行工作。)

          注意 Bug!

          順便說一句,如果您注意到 Guarded Blocks 包含一個重大 bug,那么您是對的 — 如果開發(fā)人員在 main() 中的Drop 實例上同步,會出現(xiàn)什么情況呢?

          BlockingQueue 還支持接收時間參數(shù)的方法,時間參數(shù)表明線程在返回信號故障以插入或者檢索有關(guān)項之前需要阻塞的時間。這么做會避免非綁定的等待,這對一個生產(chǎn)系統(tǒng)是致命的,因為一個非綁定的等待會很容易導(dǎo)致需要重啟的系統(tǒng)掛起。

          4. ConcurrentMap

          Map 有一個微妙的并發(fā) bug,這個 bug 將許多不知情的 Java 開發(fā)人員引入歧途。ConcurrentMap 是最容易的解決方案。

          當一個 Map 被從多個線程訪問時,通常使用 containsKey() 或者 get() 來查看給定鍵是否在存儲鍵/值對之前出現(xiàn)。但是即使有一個同步的 Map,線程還是可以在這個過程中潛入,然后奪取對 Map 的控制權(quán)。問題是,在對 put() 的調(diào)用中,鎖在 get() 開始時獲取,然后在可以再次獲取鎖之前釋放。它的結(jié)果是個競爭條件:這是兩個線程之間的競爭,結(jié)果也會因誰先運行而不同。

          如果兩個線程幾乎同時調(diào)用一個方法,兩者都會進行測試,調(diào)用 put,在處理中丟失第一線程的值。幸運的是,ConcurrentMap 接口支持許多附加方法,它們設(shè)計用于在一個鎖下進行兩個任務(wù):putIfAbsent(),例如,首先進行測試,然后僅當鍵沒有存儲在 Map 中時進行 put。

          5. SynchronousQueues

          根據(jù) Javadoc,SynchronousQueue 是個有趣的東西:

          這是一個阻塞隊列,其中,每個插入操作必須等待另一個線程的對應(yīng)移除操作,反之亦然。一個同步隊列不具有任何內(nèi)部容量,甚至不具有 1 的容量。

          本質(zhì)上講,SynchronousQueue 是之前提過的 BlockingQueue 的又一實現(xiàn)。它給我們提供了在線程之間交換單一元素的極輕量級方法,使用 ArrayBlockingQueue 使用的阻塞語義。在清單 2 中,我重寫了 清單 1 的代碼,使用 SynchronousQueue 替代ArrayBlockingQueue


          清單 2. SynchronousQueue
                      import java.util.*;
                      import java.util.concurrent.*;
                      class Producer
                      implements Runnable
                      {
                      private BlockingQueue<String> drop;
                      List<String> messages = Arrays.asList(
                      "Mares eat oats",
                      "Does eat oats",
                      "Little lambs eat ivy",
                      "Wouldn't you eat ivy too?");
                      public Producer(BlockingQueue<String> d) { this.drop = d; }
                      public void run()
                      {
                      try
                      {
                      for (String s : messages)
                      drop.put(s);
                      drop.put("DONE");
                      }
                      catch (InterruptedException intEx)
                      {
                      System.out.println("Interrupted! " +
                      "Last one out, turn out the lights!");
                      }
                      }
                      }
                      class Consumer
                      implements Runnable
                      {
                      private BlockingQueue<String> drop;
                      public Consumer(BlockingQueue<String> d) { this.drop = d; }
                      public void run()
                      {
                      try
                      {
                      String msg = null;
                      while (!((msg = drop.take()).equals("DONE")))
                      System.out.println(msg);
                      }
                      catch (InterruptedException intEx)
                      {
                      System.out.println("Interrupted! " +
                      "Last one out, turn out the lights!");
                      }
                      }
                      }
                      public class SynQApp
                      {
                      public static void main(String[] args)
                      {
                      BlockingQueue<String> drop = new SynchronousQueue<String>();
                      (new Thread(new Producer(drop))).start();
                      (new Thread(new Consumer(drop))).start();
                      }
                      }

          實現(xiàn)代碼看起來幾乎相同,但是應(yīng)用程序有額外獲益:SynchronousQueue 允許在隊列進行一個插入,只要有一個線程等著使用它。

          在實踐中,SynchronousQueue 類似于 Ada 和 CSP 等語言中可用的 “會合通道”。這些通道有時在其他環(huán)境中也稱為 “連接”,這樣的環(huán)境包括 .NET (見 參考資料)。

          結(jié)束語

          當 Java 運行時知識庫提供便利、預(yù)置的并發(fā)性時,為什么還要苦苦掙扎,試圖將并發(fā)性導(dǎo)入到您的 Collections 類?本系列的下一篇文章將會進一步探討 java.util.concurrent 名稱空間的內(nèi)容。

          posted on 2010-07-23 15:23 linugb118 閱讀(234) 評論(0)  編輯  收藏


          只有注冊用戶登錄后才能發(fā)表評論。


          網(wǎng)站導(dǎo)航:
           

          My Links

          Blog Stats

          常用鏈接

          留言簿(1)

          隨筆檔案

          搜索

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 北辰区| 茶陵县| 南部县| 上饶市| 威宁| 霍山县| 竹山县| 红河县| 普安县| 镇雄县| 巨野县| 泰安市| 鸡西市| 新平| 康马县| 孝感市| 彭泽县| 霍州市| 德格县| 额济纳旗| 教育| 尉氏县| 达日县| 阜新市| 夹江县| 达尔| 平度市| 安塞县| 弋阳县| 工布江达县| 潢川县| 灵山县| 韩城市| 陕西省| 淳安县| 淮北市| 凤阳县| 沿河| 西安市| 绥宁县| 东阿县|