作者:汪泓(hongwang_001@163.com)
1.1. 進程、線程與線程池
所謂進程是一種在自身定址空間中執行的相對獨立的程序,是現代操作系統的基石?,F在的多任務操作系統,會周期性地將CPU的時間劃分給每一個進程,使操作系統得以同時執行一個以上的程序。
線程則是進程中的一個“單一連續控制的流程”,一個進程中可以擁有多個并行的線程。但線程不能單獨存在,它依附于進程,只能從進程中派生而來。如果一個進程派生出了兩個線程,那這兩個線程共享此進程的全局變量和代碼段,但每個線程各擁有各自的堆棧,因此它們擁有各自的局部變量。
在了解了線程的概念后,下面我們就可以進入正題,現在先來看一下線程池究竟是怎么一回事?其實線程池的原理很簡單,類似于操作系統中的緩沖區的概念,它的處理流程如下:先啟動若干數量的線程,并讓這些線程都處于睡眠狀態,當客戶端有一個新請求時,就會喚醒線程池中的某一個睡眠的線程,讓它來處理客戶端的這個請求,當處理完這個請求后,線程又恢復到睡眠狀態。這種方法的引入,會減少頻繁創建與銷毀線程所帶來的系統負擔,從而留出更多的CPU時間和內存來處理實際的應用邏輯。
1.2. Java的線程概述
在 Java 編程的早期階段,位于 Oswego 市的紐約州立大學(SUNY) 的一位教授Doug Lea決定創建一個簡單的庫,以幫助開發人員構建可以更好地處理多線程情況的應用程序。這并不是說用現有的庫就不能實現,但是就像有了標準網絡庫一樣,用經過調試的、可信任的庫更容易自己處理多線程。在 Addision-Wesley 的一本相關書籍《Concurrent Programming in Java: Design Principles and Patterns》的幫助下,這個庫變得越來越流行了。最終,作者 Doug Lea 決定設法讓它成為 Java 平臺的標準部分 —— JSR-166。這個庫最后變成了 Tiger 版本的 java.util.concurrent 包。以下我們將針對J2SE(TM)5.0中引入的關于線程方面的新內容進行詳細的介紹。
1.3. Collection部分的擴容
1.3.1. Queue 接口
java.util 包為Collection提供了一個新的基本接口:java.util.Queue。雖然肯定可以在相對應的兩端進行添加和刪除而將 java.util.List 作為隊列對待,但是這個新的 Queue 接口提供了支持添加、刪除和檢查集合的更多方法,如下所示:
public boolean offer(Object element)
public Object remove()
public Object poll()
public Object element()
public Object peek()
對于隊列中大小限制,比如想在一個滿的隊列中加入一個新項,這時新的 offer 方法就可以起到相應的作用了。它不是對調用 add() 方法拋出一個 unchecked 異常,而只是得到由 offer() 返回的 false。remove() 和 poll() 方法都是從隊列中刪除第一個元素(head)。remove() 的行為與原有的 Collection 接口相似,但是新的 poll() 在用空集合調用時不是拋出異常,只是返回 null。因此新的方法更適合更容易出現在有其他異常條件的情況之中。后兩個方法 element() 和 peek() 用于在隊列的頭部查詢元素。與 remove() 方法類似,在隊列為空時,element() 拋出一個異常,而 peek() 返回 null。
J2SE(TM)5.0 中,Queue有兩種實現方式:通過實現新增的 BlockingQueue 接口以及直接實現Queue接口。下面是用LinkedList作為Queue 使用的一種方法
1.3.1.1. Queue 的實現
Queue queue = new LinkedList();
queue.offer("1");
queue.offer("2");
queue.offer("3");
queue.offer("4");
System.out.println("Head of queue is: " + queue.poll());
再復雜一點的是新的java.util.AbstractQueue 類。這個類的工作方式類似于 java.util.AbstractList 和 java.util.AbstractSet 類。在創建自定義集合時,不用自己實現整個接口,只是繼承抽象實現并填入細節。使用 AbstractQueue 時,必須為方法 offer()、 poll() 和 peek() 提供實現。像 add() 和 addAll() 這樣的方法修改為使用 offer(),而 clear() 和 remove() 使用 poll()。 最后,element() 使用 peek()。當然可以在子類中提供這些方法的優化實現,但是不是必須這么做。而且,不必創建自己的子類,可以使用幾個內置的(什么)實現, 其中兩個是不阻塞隊列: PriorityQueue 和 ConcurrentLinkedQueue。
新的 java.util.concurrent 包在 Collection Framework 中可用的具體集合類中加入了 BlockingQueue 接口和五個阻塞隊列類。BlockingQueue 接口的 Javadoc 給出了阻塞隊列的基本用法,如下圖所示。生產者中的 put() 操作會在沒有空間可用時阻塞,而消費者的 take() 操作會在隊列中沒有任何東西時阻塞。
1.3.1.2. BlockingQueue的使用
class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) { queue = q; }
public void run() {
try {
while(true) { queue.put(produce()); }
} catch (InterruptedException ex) { ... handle ...}
}
Object produce() { ... }
}
class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) { queue = q; }
public void run() {
try {
while(true) { consume(queue.take()); }
} catch (InterruptedException ex) { ... handle ...}
}
void consume(Object x) { ... }
}
class Setup {
void main() {
BlockingQueue q = new SomeQueueImplementation();
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}
五個隊列所提供的各有不同:
1、 ArrayBlockingQueue:一個由數組支持的有界隊列。
2、 LinkedBlockingQueue:一個由鏈接節點支持的可選有界隊列。
3、 PriorityBlockingQueue:一個由優先級堆支持的無界優先級隊列。
4、 DelayQueue:一個由優先級堆支持的、基于時間的調度隊列。
5、 SynchronousQueue:一個利用 BlockingQueue 接口的簡單聚集(rendezvous)機制。
前兩個類 ArrayBlockingQueue 和 LinkedBlockingQueue 幾乎相同,只是在后備存儲器方面有所不同,LinkedBlockingQueue 并不總是有容量界限。無大小界限的 LinkedBlockingQueue 類在添加元素時永遠不會有阻塞隊列的等待。新的 DelayQueue 實現可能是其中最有意思的一個了。加入到隊列中的元素必須實現新的 Delayed 接口,而且只有一個方法 —— long getDelay(java.util.concurrent.TimeUnit unit)。因為隊列的大小沒有界限,使得添加可以立即返回,但是在延遲時間過去之前,不能從隊列中取出元素。如果多個元素完成了延遲,那么最早失效/失效時間最長的元素將第一個取出,實際上這個實現并不那么復雜。以下程序就是DelayQueue 的一個具體實現:
1.3.1.3. DelayQueue 實現
class Setup {
void main() {
BlockingQueue q = new SomeQueueImplementation();
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
} return ((NanoDelay)other).trigger == trigger;
}
public boolean equals(NanoDelay other) {
return ((NanoDelay)other).trigger == trigger;
}
public long getDelay(TimeUnit unit) {
long n = trigger - System.nanoTime();
return unit.convert(n, TimeUnit.NANOSECONDS);
}
public long getTriggerTime() {
return trigger;
}
public String toString() {
return String.valueOf(trigger);
}
}
public static void main(String args[]) throws InterruptedException {
Random random = new Random();
DelayQueue queue = new DelayQueue();
for (int i=0; i < 5; i++) {
queue.add(new NanoDelay(random.nextInt(1000)));
}
long last = 0;
for (int i=0; i < 5; i++) {
NanoDelay delay = (NanoDelay)(queue.take());
long tt = delay.getTriggerTime();
System.out.println("Trigger time: " + tt);
if (i != 0) {
System.out.println("Delta: " + (tt - last));
}
last = tt;
}
}
}
這個例子首先是一個內部類 NanoDelay,它實質上將暫停給定的任意納秒(nanosecond)數,這里利用了 System 的新 nanoTime() 方法。然后 main() 方法只是將 NanoDelay 對象放到隊列中并再次將它們取出來。如果希望隊列項做一些其他事情,就需要在 Delayed 對象的實現中加入方法,并在從隊列中取出后調用這個新方法。(請隨意擴展 NanoDelay 以試驗加入其他方法做一些有趣的事情。)顯示從隊列中取出元素的兩次調用之間的時間差。如果時間差是負數,可以視為一個錯誤,因為永遠不會在延遲時間結束后,在一個更早的觸發時間從隊列中取得項。SynchronousQueue 類是最簡單的。它沒有內部容量。它就像線程之間的手遞手機制。在隊列中加入一個元素的生產者會等待另一個線程的消費者。當這個消費者出現時,這個元素就直接在消費者和生產者之間傳遞,永遠不會加入到阻塞隊列中。
1.3.2. List、Set、Map接口
新的 java.util.concurrent.ConcurrentMap 接口和 ConcurrentHashMap具體類擴展了先前的Map接口,而ConcurrentHashMap是對ConcurrentMap的直接的具體實現。新的接口增加了一組線程安全相關的基本操作:putIfAbsent,remove,replace。 putIfAbsent() 方法用于在 map 中進行添加。這個方法以要添加到 ConcurrentMap 實現中的鍵的值為參數,就像普通的 put() 方法,但是只有在 map 不包含這個鍵時,才能將鍵加入到 map 中。如果 map 已經包含這個鍵,那么這個鍵的現有值就會保留。像 putIfAbsent() 方法一樣,重載后的 remove() 方法有兩個參數 —— 鍵和值。在調用時,只有當鍵映射到指定的值時才從 map 中刪除這個鍵。如果不匹配,那么就不刪除這個鍵,并返回 false。如果值匹配鍵的當前映射內容,那么就刪除這個鍵。
對于新的 CopyOnWriteArrayList 和 CopyOnWriteArraySet 類,所有可變的(mutable)操作都首先取得后臺數組的副本,對副本進行更改,然后替換副本。這種做法保證了在遍歷自身更改的集合時,永遠不會拋出 ConcurrentModificationException。遍歷集合會用原來的集合完成,而在以后的操作中使用更新后的集合。這些新的集合,CopyOnWriteArrayList 和 CopyOnWriteArraySet,最適合于讀操作通常大大超過寫操作的情況。
1.4. 線程池
就線程池的實際實現方式而言,術語“線程池”有些使人誤解,因為線程池“明顯的”實現在大多數情形下并不一定產生我們希望的結果。術語“線程池”先于 Java 平臺出現,因此它可能是較少面向對象方法的產物。然而,該術語仍繼續廣泛應用著。
我們通常想要的是同一組固定的工作線程相結合的工作隊列,它使用 wait() 和 notify() 來通知等待線程新的工作已經到達了。該工作隊列通常被實現成具有相關監視器對象的某種鏈表。以下代碼實現了具有線程池的工作隊列。
public class WorkQueue
{
private final int nThreads;
private final PoolWorker[] threads;
private final LinkedList queue;
public WorkQueue(int nThreads)
{
this.nThreads = nThreads;
queue = new LinkedList();
threads = new PoolWorker[nThreads];
for (int i=0; i<nThreads; i++) {
threads[i] = new PoolWorker();
threads[i].start();
}
}
public void execute(Runnable r) {
synchronized(queue) {
queue.addLast(r);
queue.notify();
}
}
private class PoolWorker extends Thread {
public void run() {
Runnable r;
while (true) {
synchronized(queue) {
while (queue.isEmpty()) {
try
{
queue.wait();
}
catch (InterruptedException ignored)
{
}
}
r = (Runnable) queue.removeFirst();
}
// If we don't catch RuntimeException,
// the pool could leak threads
try {
r.run();
}
catch (RuntimeException e) {
// You might want to log something here
}
}
}
}
}
雖然線程池是構建多線程應用程序的強大機制,但使用它并不是沒有風險的。用線程池構建的應用程序容易遭受任何其它多線程應用程序容易遭受的所有并發風險,諸如同步錯誤和死鎖,它還容易遭受特定于線程池的少數其它風險,諸如與池有關的死鎖、資源不足和線程泄漏。
在J2SE(TM)5.0 中,Doug Lea 編寫了一個優秀的并發實用程序開放源碼庫 util.concurrent,它包括互斥、信號量、諸如在并發訪問下執行得很好的隊列和散列表之類集合類以及幾個工作隊列實現。該包中的 PooledExecutor 類是一種有效的、廣泛使用的以工作隊列為基礎的線程池的正確實現。Util.concurrent 定義一個 Executor 接口,以異步地執行 Runnable,另外還定義了 Executor 的幾個實現,它們具有不同的調度特征。將一個任務排入 executor 的隊列非常簡單:
Executor executor = new QueuedExecutor();
...
Runnable runnable = ... ;
executor.execute(runnable);
PooledExecutor 是一個復雜的線程池實現,它不但提供工作線程(worker thread)池中任務的調度,而且還可靈活地調整池的大小,同時還提供了線程生命周期管理,這個實現可以限制工作隊列中任務的數目,以防止隊列中的任務耗盡所有可用內存,另外還提供了多種可用的關閉和飽和度策略(阻塞、廢棄、拋出、廢棄最老的、在調用者中運行等)。所有的 Executor 實現為您管理線程的創建和銷毀,包括當關閉 executor 時,關閉所有線程,
有時您希望異步地啟動一個進程,同時希望在以后需要這個進程時,可以使用該進程的結果。FutureResult 實用程序類使這變得很容易。FutureResult 表示可能要花一段時間執行的任務,并且可以在另一個線程中執行此任務,FutureResult 對象可用作執行進程的句柄。通過它,您可以查明該任務是否已經完成,可以等待任務完成,并檢索其結果??梢詫?/SPAN> FutureResult 與 Executor 組合起來;可以創建一個 FutureResult 并將其排入 executor 的隊列,同時保留對 FutureResult 的引用。下面實例演示了一個一同使用 FutureResult 和 Executor 的簡單示例,它異步地啟動圖像著色,并繼續進行其它處理:
1.4.1. FutureResult 實例
Executor executor = ...
ImageRenderer renderer = ...
FutureResult futureImage = new FutureResult();
Runnable command = futureImage.setter(new Callable() {
public Object call() { return renderer.render(rawImage); }
});
// start the rendering process
executor.execute(command);
// do other things while executing
drawBorders();
drawCaption();
// retrieve the future result, blocking if necessary
drawImage((Image)(futureImage.get())); // use future
還可以使用 FutureResult 來提高按需裝入高速緩存的并發性。通過將 FutureResult 放置在高速緩存內,而不是放置計算本身的結果,可以減少持有高速緩存上寫鎖的時間。雖然這種做法不能加快第一個線程把某一項放入高速緩存,但它將減少第一個線程阻塞其它線程訪問高速緩存的時間。它還使其它線程更早地使用結果,因為它們可以從高速緩存中檢索 FutureTask。以下是使用用于高速緩存的 FutureResult 示例:
1.4.2. 使用 FutureResult 來改善高速緩存
public class FileCache {
private Map cache = new HashMap();
private Executor executor = new PooledExecutor();
public void get(final String name) {
FutureResult result;
synchronized(cache) {
result = cache.get(name);
if (result == null) {
result = new FutureResult();
executor.execute(result.setter(new Callable() {
public Object call() { return loadFile(name); }
}));
cache.put(result);
}
}
return result.get();
}
}
這種方法使第一個線程快速地進入和退出同步塊,使其它線程與第一個線程一樣快地得到第一個線程計算的結果,不可能出現兩個線程都試圖計算同一個對象。
1.5. 結束語
線程在各個應用領域里占據著舉足輕重的地位,而且它在實際應用中的復雜程度非只言片語能夠講清楚的,這里僅僅介紹了在J2SE(TM)5.0中新加入的成分,如讀者需要更深入的了解或更全面地學習線程的話,請參閱相關專業書籍。