關(guān)于 java.util.concurrent 您不知道的 5 件事
Concurrent Collections 是 Java™ 5 的巨大附加產(chǎn)品,但是在關(guān)于注釋和泛型的爭(zhēng)執(zhí)中很多 Java 開發(fā)人員忽視了它們。此外(或者更老實(shí)地說),許多開發(fā)人員避免使用這個(gè)數(shù)據(jù)包,因?yàn)樗麄冋J(rèn)為它一定很復(fù)雜,就像它所要解決的問題一樣。
事實(shí)上,java.util.concurrent
包含許多類,能夠有效解決普通的并發(fā)問題,無需復(fù)雜工序。閱讀本文,了解 java.util.concurrent
類,比如 CopyOnWriteArrayList
和 BlockingQueue
如何幫助您解決多線程編程的棘手問題。
盡管本質(zhì)上 不是 Collections 類,但 java.util.concurrent.TimeUnit
枚舉讓代碼更易讀懂。使用 TimeUnit
將使用您的方法或 API 的開發(fā)人員從毫秒的 “暴政” 中解放出來。
TimeUnit
包括所有時(shí)間單位,從 MILLISECONDS
和 MICROSECONDS
到 DAYS
和 HOURS
,這就意味著它能夠處理一個(gè)開發(fā)人員所需的幾乎所有的時(shí)間范圍類型。同時(shí),因?yàn)樵诹信e上聲明了轉(zhuǎn)換方法,在時(shí)間加快時(shí),將 HOURS
轉(zhuǎn)換回 MILLISECONDS
甚至變得更容易。
創(chuàng)建數(shù)組的全新副本是過于昂貴的操作,無論是從時(shí)間上,還是從內(nèi)存開銷上,因此在通常使用中很少考慮;開發(fā)人員往往求助于使用同步的 ArrayList
。然而,這也是一個(gè)成本較高的選擇,因?yàn)槊慨?dāng)您跨集合內(nèi)容進(jìn)行迭代時(shí),您就不得不同步所有操作,包括讀和寫,以此保證一致性。
這又讓成本結(jié)構(gòu)回到這樣一個(gè)場(chǎng)景:需多讀者都在讀取 ArrayList
,但是幾乎沒人會(huì)去修改它。
CopyOnWriteArrayList
是個(gè)巧妙的小寶貝,能解決這一問題。它的 Javadoc 將 CopyOnWriteArrayList
定義為一個(gè) “ArrayList
的線程安全變體,在這個(gè)變體中所有易變操作(添加,設(shè)置等)可以通過復(fù)制全新的數(shù)組來實(shí)現(xiàn)”。
集合從內(nèi)部將它的內(nèi)容復(fù)制到一個(gè)沒有修改的新數(shù)組,這樣讀者訪問數(shù)組內(nèi)容時(shí)就不會(huì)產(chǎn)生同步成本(因?yàn)樗麄儚膩聿皇窃谝鬃償?shù)據(jù)上操作)。
本質(zhì)上講,CopyOnWriteArrayList
很適合處理 ArrayList
經(jīng)常讓我們失敗的這種場(chǎng)景:讀取頻繁,但很少有寫操作的集合,例如 JavaBean 事件的 Listener
s。
BlockingQueue
接口表示它是一個(gè) Queue
,意思是它的項(xiàng)以先入先出(FIFO)順序存儲(chǔ)。在特定順序插入的項(xiàng)以相同的順序檢索 — 但是需要附加保證,從空隊(duì)列檢索一個(gè)項(xiàng)的任何嘗試都會(huì)阻塞調(diào)用線程,直到這個(gè)項(xiàng)準(zhǔn)備好被檢索。同理,想要將一個(gè)項(xiàng)插入到滿隊(duì)列的嘗試也會(huì)導(dǎo)致阻塞調(diào)用線程,直到隊(duì)列的存儲(chǔ)空間可用。
BlockingQueue
干凈利落地解決了如何將一個(gè)線程收集的項(xiàng)“傳遞”給另一線程用于處理的問題,無需考慮同步問題。Java Tutorial 的 Guarded Blocks 試用版就是一個(gè)很好的例子。它構(gòu)建一個(gè)單插槽綁定的緩存,當(dāng)新的項(xiàng)可用,而且插槽也準(zhǔn)備好接受新的項(xiàng)時(shí),使用手動(dòng)同步和 wait()
/notifyAll()
在線程之間發(fā)信。(詳見 Guarded Blocks 實(shí)現(xiàn)。)
盡管 Guarded Blocks 教程中的代碼有效,但是它耗時(shí)久,混亂,而且也并非完全直觀。退回到 Java 平臺(tái)較早的時(shí)候,沒錯(cuò),Java 開發(fā)人員不得不糾纏于這種代碼;但現(xiàn)在是 2010 年 — 情況難道沒有改善?
清單 1 顯示了 Guarded Blocks 代碼的重寫版,其中我使用了一個(gè) ArrayBlockingQueue
,而不是手寫的 Drop
。
清單 1. BlockingQueue
import java.util.*; import java.util.concurrent.*; class Producer implements Runnable { private BlockingQueue<String> drop; List<String> messages = Arrays.asList( "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "Wouldn't you eat ivy too?"); public Producer(BlockingQueue<String> d) { this.drop = d; } public void run() { try { for (String s : messages) drop.put(s); drop.put("DONE"); } catch (InterruptedException intEx) { System.out.println("Interrupted! " + "Last one out, turn out the lights!"); } } } class Consumer implements Runnable { private BlockingQueue<String> drop; public Consumer(BlockingQueue<String> d) { this.drop = d; } public void run() { try { String msg = null; while (!((msg = drop.take()).equals("DONE"))) System.out.println(msg); } catch (InterruptedException intEx) { System.out.println("Interrupted! " + "Last one out, turn out the lights!"); } } } public class ABQApp { public static void main(String[] args) { BlockingQueue<String> drop = new ArrayBlockingQueue(1, true); (new Thread(new Producer(drop))).start(); (new Thread(new Consumer(drop))).start(); } } |
ArrayBlockingQueue
還體現(xiàn)了“公平” — 意思是它為讀取器和編寫器提供線程先入先出訪問。這種替代方法是一個(gè)更有效,但又冒窮盡部分線程風(fēng)險(xiǎn)的政策。(即,允許一些讀取器在其他讀取器鎖定時(shí)運(yùn)行效率更高,但是您可能會(huì)有讀取器線程的流持續(xù)不斷的風(fēng)險(xiǎn),導(dǎo)致編寫器無法進(jìn)行工作。)
BlockingQueue
還支持接收時(shí)間參數(shù)的方法,時(shí)間參數(shù)表明線程在返回信號(hào)故障以插入或者檢索有關(guān)項(xiàng)之前需要阻塞的時(shí)間。這么做會(huì)避免非綁定的等待,這對(duì)一個(gè)生產(chǎn)系統(tǒng)是致命的,因?yàn)橐粋€(gè)非綁定的等待會(huì)很容易導(dǎo)致需要重啟的系統(tǒng)掛起。
Map
有一個(gè)微妙的并發(fā) bug,這個(gè) bug 將許多不知情的 Java 開發(fā)人員引入歧途。ConcurrentMap
是最容易的解決方案。
當(dāng)一個(gè) Map
被從多個(gè)線程訪問時(shí),通常使用 containsKey()
或者 get()
來查看給定鍵是否在存儲(chǔ)鍵/值對(duì)之前出現(xiàn)。但是即使有一個(gè)同步的 Map
,線程還是可以在這個(gè)過程中潛入,然后奪取對(duì) Map
的控制權(quán)。問題是,在對(duì) put()
的調(diào)用中,鎖在 get()
開始時(shí)獲取,然后在可以再次獲取鎖之前釋放。它的結(jié)果是個(gè)競(jìng)爭(zhēng)條件:這是兩個(gè)線程之間的競(jìng)爭(zhēng),結(jié)果也會(huì)因誰先運(yùn)行而不同。
如果兩個(gè)線程幾乎同時(shí)調(diào)用一個(gè)方法,兩者都會(huì)進(jìn)行測(cè)試,調(diào)用 put,在處理中丟失第一線程的值。幸運(yùn)的是,ConcurrentMap
接口支持許多附加方法,它們?cè)O(shè)計(jì)用于在一個(gè)鎖下進(jìn)行兩個(gè)任務(wù):putIfAbsent()
,例如,首先進(jìn)行測(cè)試,然后僅當(dāng)鍵沒有存儲(chǔ)在 Map
中時(shí)進(jìn)行 put。
根據(jù) Javadoc,SynchronousQueue
是個(gè)有趣的東西:
這是一個(gè)阻塞隊(duì)列,其中,每個(gè)插入操作必須等待另一個(gè)線程的對(duì)應(yīng)移除操作,反之亦然。一個(gè)同步隊(duì)列不具有任何內(nèi)部容量,甚至不具有 1 的容量。
本質(zhì)上講,SynchronousQueue
是之前提過的 BlockingQueue
的又一實(shí)現(xiàn)。它給我們提供了在線程之間交換單一元素的極輕量級(jí)方法,使用 ArrayBlockingQueue
使用的阻塞語義。在清單 2 中,我重寫了 清單 1 的代碼,使用 SynchronousQueue
替代ArrayBlockingQueue
:
清單 2. SynchronousQueue
import java.util.*; import java.util.concurrent.*; class Producer implements Runnable { private BlockingQueue<String> drop; List<String> messages = Arrays.asList( "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "Wouldn't you eat ivy too?"); public Producer(BlockingQueue<String> d) { this.drop = d; } public void run() { try { for (String s : messages) drop.put(s); drop.put("DONE"); } catch (InterruptedException intEx) { System.out.println("Interrupted! " + "Last one out, turn out the lights!"); } } } class Consumer implements Runnable { private BlockingQueue<String> drop; public Consumer(BlockingQueue<String> d) { this.drop = d; } public void run() { try { String msg = null; while (!((msg = drop.take()).equals("DONE"))) System.out.println(msg); } catch (InterruptedException intEx) { System.out.println("Interrupted! " + "Last one out, turn out the lights!"); } } } public class SynQApp { public static void main(String[] args) { BlockingQueue<String> drop = new SynchronousQueue<String>(); (new Thread(new Producer(drop))).start(); (new Thread(new Consumer(drop))).start(); } } |
實(shí)現(xiàn)代碼看起來幾乎相同,但是應(yīng)用程序有額外獲益:SynchronousQueue
允許在隊(duì)列進(jìn)行一個(gè)插入,只要有一個(gè)線程等著使用它。
在實(shí)踐中,SynchronousQueue
類似于 Ada 和 CSP 等語言中可用的 “會(huì)合通道”。這些通道有時(shí)在其他環(huán)境中也稱為 “連接”,這樣的環(huán)境包括 .NET (見 參考資料)。
當(dāng) Java 運(yùn)行時(shí)知識(shí)庫提供便利、預(yù)置的并發(fā)性時(shí),為什么還要苦苦掙扎,試圖將并發(fā)性導(dǎo)入到您的 Collections 類?本系列的下一篇文章將會(huì)進(jìn)一步探討 java.util.concurrent
名稱空間的內(nèi)容。
posted on 2010-07-23 15:23 linugb118 閱讀(234) 評(píng)論(0) 編輯 收藏