你所不知道的五件事情--java.util.concurrent(第一部分)
--使用并發集合類進行多線程編程
這是Ted Neward在IBM developerWorks中5 things系列文章中的一篇,講述了關于Java并發集合API的一些應用竅門,值得大家學習。(2010.05.24最后更新)--使用并發集合類進行多線程編程
摘要:編寫既要性能良好又要防止應用崩潰的多線程代碼確實很難--這也正是我們需要java.util.concurrent的原因。Ted Neward向你展示了像CopyOnWriteArrayList,BlockingQueue和ConcurrentMap這樣的并發集合類是如何為了并發編程需要而改進標準集合類的。
并發集合API是Java 5的一大新特性,但由于對Annotation和泛型的熱捧,許多Java開發者忽視了這些API。另外(可能更真實的是),因為許多開發者猜想并發集合 API肯定很復雜,就像去嘗試解決一些問題那樣,所以開發者們會回避java.util.concurrent包。
事實上,java.util.concurrent的很多類并不需要你費很大力就能高效地解決通常的并發問題。繼續看下去,你就能學到 java.util.concurrent中的類,如CopyOnWriteArrayList和BlockingQueue,是怎樣幫助你解決多線程編程可怕的挑戰。
1. TimeUnit
java.util.concurrent.TimeUnit本身并不是集合框架類,這個枚舉使得代碼非常易讀。使用TimeUnit能夠將開發者從與毫秒相關的困苦中解脫出來,轉而他們自己的方法或API。
TimeUnit能與所有的時間單元協作,范圍從毫秒和微秒到天和小時,這就意味著它能處理開發者可能用到的幾乎所有時間類型。還要感謝這個枚舉類型聲明的時間轉換方法,當時間加快時,它甚至能細致到把小時轉換回毫秒。
2. CopyOnWriteArrayList
制作數組的干凈復本是一項成本極高的操作,在時間和內存這兩方面均有開銷,以至于在通常的應用中不能考慮該方法;開發者常常求助于使用同步的 ArrayList來替代前述方法。但這也是一個比較有代價的選項,因為當每次你遍歷訪問該集合中的內容時,你不得不同步所有的方法,包括讀和寫,以確保內存一致性。
在有大量用戶在讀取ArrayList而只有很少用戶對其進行修改的這一場景中,上述方法將使成本結構變得緩慢。
CopyOnWriteArrayList就是解決這一問題的一個極好的寶貝工具。它的Javadoc描述到,ArrayList通過創建數組的干凈復本來實現可變操作(添加,修改,等等),而CopyOnWriteArrayList則是ArrayList的一個"線程安全"的變體。
對于任何修改操作,該集合類會在內部將其內容復制到一個新數組中,所以當讀用戶訪問數組的內容時不會招致任何同步開銷(因為它們沒有對可變數據進行操作)。
本質上,創建CopyOnWriteArrayList的想法,是出于應對當ArrayList無法滿足我們要求時的場景:經常讀,而很少寫的集合對象,例如針對JavaBean事件的Listener。
3. BlockingQueue
BlockingQueue接口表明它是一個Queue,這就意味著它的元素是按先進先出(FIFO)的次序進行存儲的。以特定次序插入的元素會以相同的次序被取出--但根據插入保證,任何從空隊列中取出元素的嘗試都會堵塞調用線程直到該元素可被取出時為止。同樣地,任何向一個已滿隊列中插入元素的嘗試將會堵塞調用線程直到該隊列的存儲空間有空余時為止。
在不需要顯式地關注同步問題時,如何將由一個線程聚集的元素"交給"另一個線程進行處理呢,BlockingQueue很靈巧地解決了這個問題。Java Tutorial中Guarded Blocks一節是很好的例子。它使用手工同步和wait()/notifyAll()方法創建了一個單點(single-slot)受限緩沖,當一個新的元素可被消費且當該點已經準備好被一個新的元素填充時,該方法就會在線程之間發出信號。(詳情請見Guarded Blocks)
盡管教程Guarded Blocks中的代碼可以正常工作,但它比較長,有些凌亂,而且完全不直觀。誠然,在Java平臺的早期時代,Java開發者們不得不;但現在已經是 2010年了--問題已經得到改進?
清單1展示的程序重寫了Guarded Blocks中的代碼,其中我使用ArrayBlockingQueue替代了手工編寫的Drop。
清單1. BlockingQueue
import java.util.*;
import java.util.concurrent.*;
class Producer
implements Runnable
{
private BlockingQueue<String> drop;
List<String> messages = Arrays.asList(
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"Wouldn't you eat ivy too?");
public Producer(BlockingQueue<String> d) { this.drop = d; }
public void run()
{
try
{
for (String s : messages)
drop.put(s);
drop.put("DONE");
}
catch (InterruptedException intEx)
{
System.out.println("Interrupted! " +
"Last one out, turn out the lights!");
}
}
}
class Consumer
implements Runnable
{
private BlockingQueue<String> drop;
public Consumer(BlockingQueue<String> d) { this.drop = d; }
public void run()
{
try
{
String msg = null;
while (!((msg = drop.take()).equals("DONE")))
System.out.println(msg);
}
catch (InterruptedException intEx)
{
System.out.println("Interrupted! " +
"Last one out, turn out the lights!");
}
}
}
public class ABQApp
{
public static void main(String[] args)
{
BlockingQueue<String> drop = new ArrayBlockingQueue(1, true);
(new Thread(new Producer(drop))).start();
(new Thread(new Consumer(drop))).start();
}
}
import java.util.concurrent.*;
class Producer
implements Runnable
{
private BlockingQueue<String> drop;
List<String> messages = Arrays.asList(
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"Wouldn't you eat ivy too?");
public Producer(BlockingQueue<String> d) { this.drop = d; }
public void run()
{
try
{
for (String s : messages)
drop.put(s);
drop.put("DONE");
}
catch (InterruptedException intEx)
{
System.out.println("Interrupted! " +
"Last one out, turn out the lights!");
}
}
}
class Consumer
implements Runnable
{
private BlockingQueue<String> drop;
public Consumer(BlockingQueue<String> d) { this.drop = d; }
public void run()
{
try
{
String msg = null;
while (!((msg = drop.take()).equals("DONE")))
System.out.println(msg);
}
catch (InterruptedException intEx)
{
System.out.println("Interrupted! " +
"Last one out, turn out the lights!");
}
}
}
public class ABQApp
{
public static void main(String[] args)
{
BlockingQueue<String> drop = new ArrayBlockingQueue(1, true);
(new Thread(new Producer(drop))).start();
(new Thread(new Consumer(drop))).start();
}
}
ArrayBlockingQueue也崇尚"公平"--即意味著,它能給予讀和寫線程先進先出的訪問次序。該方法可能是一種更高效的策略,但它也加大了造成線程饑餓的風險。(就是說,當其它讀線程持有鎖時,該策略可更高效地允許讀線程進行執行,但這也就會產生讀線程的常量流使寫線程總是無法執行的風險)
BlockingQueue也支持在方法中使用時間參數,當插入或取出元素出了問題時,方法需要返回以發出操作失敗的信號,而該時間參數指定了在返回前應該阻塞多長時間。
4. ConcurrentMap
Map有一些細微的并發Bug,會使許多粗心的Java開發者誤入歧途。ConcurrentMap則是一個簡單的決定方案。
當有多個線程在訪問一個Map時,通常在儲存一個鍵/值對之前通常會使用方法containsKey()或get()去確定給出的鍵是否存在。即使用同步的Map,某個線程仍可在處理的過程中潛入其中,然后獲得對Map的控制權。問題在于,在get()方法的開始處獲得了鎖,然后在調用方法put()去重新獲得該鎖之前會先釋放它。這就導致了競爭條件:兩個線程之間的競爭,根據哪個線程先執行,其結果將不盡相同。
如果兩個線程在同一時刻調用一個方法,一個測試鍵是否存在,另一個則置入新的鍵/值對,那么在此過程中,第一個線程的值將會丟失。幸運地是,ConcurrentMap接口支持一組額外的方法,設計這些方法是為了在一個鎖中做兩件事情:例如,putIfAbsent()首先進行測試,之后只有當該鍵還未存儲到Map中時,才執行置入操作。
5. SynchronousQueues
根據Javadoc的描述,SynchronousQueue是一個很有趣的創造物:
一個阻塞隊列在每次的插入操作中必須等等另一線程執行對應的刪除線程,反之亦然。同步隊列并沒有任何內部的存儲空間,一個都沒有。
本質上,SynchronousQueue是之前提及的BlockingQueue的另一種實現。使用ArrayBlockingQueue利用的阻塞語義,SynchronousQueue給予我們一種極輕量級的途徑在兩個線程之間交換單個元素。在清單2中,我用SynchronousQueue替代 ArrayBlockingQueue重寫了清單1的代碼:
清單2 SynchronousQueue
import java.util.*;
import java.util.concurrent.*;
class Producer
implements Runnable
{
private BlockingQueue<String> drop;
List<String> messages = Arrays.asList(
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"Wouldn't you eat ivy too?");
public Producer(BlockingQueue<String> d) { this.drop = d; }
public void run()
{
try
{
for (String s : messages)
drop.put(s);
drop.put("DONE");
}
catch (InterruptedException intEx)
{
System.out.println("Interrupted! " +
"Last one out, turn out the lights!");
}
}
}
class Consumer
implements Runnable
{
private BlockingQueue<String> drop;
public Consumer(BlockingQueue<String> d) { this.drop = d; }
public void run()
{
try
{
String msg = null;
while (!((msg = drop.take()).equals("DONE")))
System.out.println(msg);
}
catch (InterruptedException intEx)
{
System.out.println("Interrupted! " +
"Last one out, turn out the lights!");
}
}
}
public class SynQApp
{
public static void main(String[] args)
{
BlockingQueue<String> drop = new SynchronousQueue<String>();
(new Thread(new Producer(drop))).start();
(new Thread(new Consumer(drop))).start();
}
}
import java.util.concurrent.*;
class Producer
implements Runnable
{
private BlockingQueue<String> drop;
List<String> messages = Arrays.asList(
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"Wouldn't you eat ivy too?");
public Producer(BlockingQueue<String> d) { this.drop = d; }
public void run()
{
try
{
for (String s : messages)
drop.put(s);
drop.put("DONE");
}
catch (InterruptedException intEx)
{
System.out.println("Interrupted! " +
"Last one out, turn out the lights!");
}
}
}
class Consumer
implements Runnable
{
private BlockingQueue<String> drop;
public Consumer(BlockingQueue<String> d) { this.drop = d; }
public void run()
{
try
{
String msg = null;
while (!((msg = drop.take()).equals("DONE")))
System.out.println(msg);
}
catch (InterruptedException intEx)
{
System.out.println("Interrupted! " +
"Last one out, turn out the lights!");
}
}
}
public class SynQApp
{
public static void main(String[] args)
{
BlockingQueue<String> drop = new SynchronousQueue<String>();
(new Thread(new Producer(drop))).start();
(new Thread(new Consumer(drop))).start();
}
}
上述實現看起來幾乎相同,但該應用程序已新加了一個好處,在這個實現中,只有當有線程正在等待消費某個元素時,SynchronousQueue才會允許將該元素插入到隊列中。
就實踐方式來看,SynchronousQueue類似于Ada或CSP等語言中的"交會通道(Rendezvous Channel)"。在其它環境中,有時候被稱為"連接"。
結論
當Java運行時類庫預先已經提供了方便使用的等價物時,為什么還要費力地向集合框架中引入并發呢?本系列的下一篇文章將探索 java.util.concurrent命名空間的更多內容。
請關注你所不知道的五件事情--java.util.concurrent(第二部分)