java.util.concurrent
包含許多線程安全、測試良好、高性能的并發構建塊。不客氣地說,創建 java.util.concurrent
的目的就是要實現 Collection 框架對數據結構所執行的并發操作。通過提供一組可靠的、高性能并發構建塊,開發人員可以提高并發類的線程安全、可伸縮性、性能、可讀性和可靠性。
如果一些類名看起來相似,可能是因為 java.util.concurrent
中的許多概念源自 Doug Lea 的 util.concurrent
庫(請參閱 參考資料)。
JDK 5.0 中的并發改進可以分為三組:
- JVM 級別更改。大多數現代處理器對并發對某一硬件級別提供支持,通常以 compare-and-swap (CAS)指令形式。CAS 是一種低級別的、細粒度的技術,它允許多個線程更新一個內存位置,同時能夠檢測其他線程的沖突并進行恢復。它是許多高性能并發算法的基礎。在 JDK 5.0 之前,Java 語言中用于協調線程之間的訪問的惟一原語是同步,同步是更重量級和粗粒度的。公開 CAS 可以開發高度可伸縮的并發 Java 類。這些更改主要由 JDK 庫類使用,而不是由開發人員使用。
- 低級實用程序類 -- 鎖定和原子類。使用 CAS 作為并發原語,
ReentrantLock
類提供與synchronized
原語相同的鎖定和內存語義,然而這樣可以更好地控制鎖定(如計時的鎖定等待、鎖定輪詢和可中斷的鎖定等待)和提供更好的可伸縮性(競爭時的高性能)。大多數開發人員將不再直接使用ReentrantLock
類,而是使用在ReentrantLock
類上構建的高級類。
- 高級實用程序類。這些類實現并發構建塊,每個計算機科學文本中都會講述這些類 -- 信號、互斥、閂鎖、屏障、交換程序、線程池和線程安全集合類等。大部分開發人員都可以在應用程序中用這些類,來替換許多(如果不是全部)同步、
wait()
和notify()
的使用,從而提高性能、可讀性和正確性。
本教程將重點介紹 java.util.concurrent
包提供的高級實用程序類 -- 線程安全集合、線程池和同步實用程序。這些是初學者和專家都可以使用的"現成"類。
在第一小節中,我們將回顧并發的基本知識,盡管它不應取代對線程和線程安全的了解。那些一點都不熟悉線程的讀者應該先參考一些關于線程的介紹,如"Introduction to Java Threads"教程(請參閱參考資料)。
接下來的幾個小節將研究 java.util.concurrent
中的高級實用程序類 -- 線程安全集合、線程池、信號和同步工具。
最后一小節將介紹 java.util.concurrent
中的低級并發構建塊,并提供一些性能測評來顯示新 java.util.concurrent
類的可伸縮性的改進。
什么是線程?
所有重要的操作系統都支持進程的概念 -- 獨立運行的程序,在某種程度上相互隔離。
線程有時稱為 輕量級進程。與進程一樣,它們擁有通過程序運行的獨立的并發路徑,并且每個線程都有自己的程序計數器,稱為堆棧和本地變量。然而,線程存在于進程中,它們與同一進程內的其他線程共享內存、文件句柄以及每進程狀態。
今天,幾乎每個操作系統都支持線程,允許執行多個可獨立調度的線程,以便共存于一個進程中。因為一個進程中的線程是在同一個地址空間中執行的,所以多個線程可以同時訪問相同對象,并且它們從同一堆棧中分配對象。雖然這使線程更易于與其他線程共享信息,但也意味著您必須確保線程之間不相互干涉。
正確使用線程時,線程能帶來諸多好處,其中包括更好的資源利用、簡化開發、高吞吐量、更易響應的用戶界面以及能執行異步處理。
Java 語言包括用于協調線程行為的原語,從而可以在不違反設計原型或者不破壞數據結構的前提下安全地訪問和修改共享變量。
線程有哪些功能? |
在 Java 程序中存在很多理由使用線程,并且不管開發人員知道線程與否,幾乎每個 Java 應用程序都使用線程。許多 J2SE 和 J2EE 工具可以創建線程,如 RMI、Servlet、Enterprise JavaBeans 組件和 Swing GUI 工具包。
使用線程的理由包括:
- 更易響應的用戶界面。 事件驅動的 GUI 工具包(如 AWT 或 Swing)使用單獨的事件線程來處理 GUI 事件。從事件線程中調用通過 GUI 對象注冊的事件監聽器。然而,如果事件監聽器將執行冗長的任務(如文檔拼寫檢查),那么 UI 將出現凍結,因為事件線程直到冗長任務完畢之后才能處理其他事件。通過在單獨線程中執行冗長操作,當執行冗長后臺任務時,UI 能繼續響應。
- 使用多處理器。 多處理器(MP)系統變得越來越便宜,并且分布越來越廣泛。因為調度的基本單位通常是線程,所以不管有多少處理器可用,一個線程的應用程序一次只能在一個處理器上運行。在設計良好的程序中,通過更好地利用可用的計算機資源,多線程能夠提高吞吐量和性能。
- 簡化建模。 有效使用線程能夠使程序編寫變得更簡單,并易于維護。通過合理使用線程,個別類可以避免一些調度的詳細、交叉存取操作、異步 IO 和資源等待以及其他復雜問題。相反,它們能專注于域的要求,簡化開發并改進可靠性。
- 異步或后臺處理。 服務器應用程序可以同時服務于許多遠程客戶機。如果應用程序從 socket 中讀取數據,并且沒有數據可以讀取,那么對
read()
的調用將被阻塞,直到有數據可讀。在單線程應用程序中,這意味著當某一個線程被阻塞時,不僅處理相應請求要延遲,而且處理所有請求也將延遲。然而,如果每個 socket 都有自己的 IO 線程,那么當一個線程被阻塞時,對其他并發請求行為沒有影響。
線程安全
如果將這些類用于多線程環境中,雖然確保這些類的線程安全比較困難,但線程安全卻是必需的。java.util.concurrent
規范進程的一個目標就是提供一組線程安全的、高性能的并發構建塊,從而使開發人員能夠減輕一些編寫線程安全類的負擔。
線程安全類非常難以明確定義,大多數定義似乎都是完全循環的??焖?Google 搜索會顯示下列線程安全代碼定義的例子,但這些定義(或者更確切地說是描述)通常沒什么幫助:
- . . . can be called from multiple programming threads without unwanted interaction between the threads.
- . . . may be called by more than on thread at a time without requiring any other action on the caller's part.
通過類似這樣的定義,不奇怪我們為什么對線程安全如此迷惑。這些定義幾乎就是在說"如果可以從多個線程安全調用類,那么該類就是線程安全的"。這當然是線程安全的解釋,但對我們區別線程安全類和不安全類沒有什么幫助。我們使用"安全"是為了說明什么?
要成為線程安全的類,首先它必須在單線程環境中正確運行。如果正確實現了類,那么說明它符合規范,對該類的對象的任何順序的操作(公共字段的讀寫、公共方法的調用)都不應該使對象處于無效狀態;觀察將處于無效狀態的對象;或違反類的任何變量、前置條件或后置條件。
而且,要成為線程安全的類,在從多個線程訪問時,它必須繼續正確運行,而不管運行時環境執行那些線程的調度和交叉,且無需對部分調用代碼執行任何其他同步。結果是對線程安全對象的操作將用于按固定的整體一致順序出現所有線程。
如果沒有線程之間的某種明確協調,比如鎖定,運行時可以隨意在需要時在多線程中交叉操作執行。
在 JDK 5.0 之前,確保線程安全的主要機制是 synchronized
原語。訪問共享變量(那些可以由多個線程訪問的變量)的線程必須使用同步來協調對共享變量的讀寫訪問。java.util.concurrent
包提供了一些備用并發原語,以及一組不需要任何其他同步的線程安全實用程序類。
令人厭煩的并發
即使您的程序從沒有明確創建線程,也可能會有許多工具或框架代表您創建了線程,這時要求從這些線程調用的類是線程安全的。這樣會對開發人員帶來較大的設計和實現負擔,因為開發線程安全類比開發非線程安全類有更多要注意的事項,且需要更多的分析。
AWT 和 Swing
這些 GUI 工具包創建了稱為時間線程的后臺線程,將從該線程調用通過 GUI 組件注冊的監聽器。因此,實現這些監聽器的類必須是線程安全的。
TimerTask
JDK 1.3 中引入的 TimerTask
工具允許稍后執行任務或計劃定期執行任務。在 Timer
線程中執行 TimerTask
事件,這意味著作為 TimerTask
執行的任務必須是線程安全的。
Servlet 和 JavaServer Page 技術
Servlet 容器可以創建多個線程,在多個線程中同時調用給定 servlet,從而進行多個請求。因此 servlet 類必須是線程安全的。
RMI
遠程方法調用(remote method invocation,RMI)工具允許調用其他 JVM 中運行的操作。實現遠程對象最普遍的方法是擴展 UnicastRemoteObject
。例示 UnicastRemoteObject
時,它是通過 RMI 調度器注冊的,該調度器可能創建一個或多個線程,將在這些線程中執行遠程方法。因此,遠程類必須是線程安全的。
正如所看到的,即使應用程序沒有明確創建線程,也會發生許多可能會從其他線程調用類的情況。幸運的是,java.util.concurrent
中的類可以大大簡化編寫線程安全類的任務。
例子 -- 非線程安全 servlet
下列 servlet 看起來像無害的留言板 servlet,它保存每個來訪者的姓名。然而,該 servlet 不是線程安全的,而這個 servlet 應該是線程安全的。問題在于它使用 HashSet
存儲來訪者的姓名,HashSet
不是線程安全的類。
當我們說這個 servlet 不是線程安全的時,是說它所造成的破壞不僅僅是丟失留言板輸入。在最壞的情況下,留言板數據結構都可能被破壞并且無法恢復。
public class UnsafeGuestbookServlet extends HttpServlet {
private Set visitorSet = new HashSet();
protected void doGet(HttpServletRequest httpServletRequest,
HttpServletResponse httpServletResponse) throws ServletException, IOException {
String visitorName = httpServletRequest.getParameter("NAME");
if (visitorName != null)
visitorSet.add(visitorName);
}
}
通過將 visitorSet
的定義更改為下列代碼,可以使該類變為線程安全的:
private Set visitorSet = Collections.synchronizedSet(new HashSet());
如上所示的例子顯示線程的內置支持是一把雙刃劍 -- 雖然它使構建多線程應用程序變得很容易,但它同時要求開發人員更加注意并發問題,甚至在使用留言板 servlet 這樣普通的東西時也是如此。
線程安全集合
JDK 1.2 中引入的 Collection 框架是一種表示對象集合的高度靈活的框架,它使用基本接口 List
、Set
和 Map
。通過 JDK 提供每個集合的多次實現(HashMap
、Hashtable
、TreeMap
、WeakHashMap
、HashSet
、TreeSet
、Vector
、ArrayList
、LinkedList
等等)。其中一些集合已經是線程安全的(Hashtable
和 Vector
),通過同步的封裝工廠(Collections.synchronizedMap()
、synchronizedList()
和 synchronizedSet()
),其余的集合均可表現為線程安全的。
java.util.concurrent
包添加了多個新的線程安全集合類(ConcurrentHashMap
、CopyOnWriteArrayList
和 CopyOnWriteArraySet
)。這些類的目的是提供高性能、高度可伸縮性、線程安全的基本集合類型版本。
java.util
中的線程集合仍有一些缺點。例如,在迭代鎖定時,通常需要將該鎖定保留在集合中,否則,會有拋出 ConcurrentModificationException
的危險。(這個特性有時稱為條件線程安全;有關的更多說明,請參閱 參考資料。)此外,如果從多個線程頻繁地訪問集合,則常常不能很好地執行這些類。java.util.concurrent
中的新集合類允許通過在語義中的少量更改來獲得更高的并發。
JDK 5.0 還提供了兩個新集合接口 -- Queue
和 BlockingQueue
。Queue
接口與 List
類似,但它只允許從后面插入,從前面刪除。通過消除 List
的隨機訪問要求,可以創建比現有 ArrayList
和 LinkedList
實現性能更好的 Queue
實現。因為 List
的許多應用程序實際上不需要隨機訪問,所以Queue
通常可以替代 List
,來獲得更好的性能。
弱一致的迭代器
java.util
包中的集合類都返回 fail-fast 迭代器,這意味著它們假設線程在集合內容中進行迭代時,集合不會更改它的內容。如果 fail-fast 迭代器檢測到在迭代過程中進行了更改操作,那么它會拋出 ConcurrentModificationException
,這是不可控異常。
在迭代過程中不更改集合的要求通常會對許多并發應用程序造成不便。相反,比較好的是它允許并發修改并確保迭代器只要進行合理操作,就可以提供集合的一致視圖,如 java.util.concurrent
集合類中的迭代器所做的那樣。
java.util.concurrent
集合返回的迭代器稱為弱一致的(weakly consistent)迭代器。對于這些類,如果元素自從迭代開始已經刪除,且尚未由 next()
方法返回,那么它將不返回到調用者。如果元素自迭代開始已經添加,那么它可能返回調用者,也可能不返回。在一次迭代中,無論如何更改底層集合,元素不會被返回兩次。
CopyOnWriteArrayList 和 CopyOnWriteArraySet
可以用兩種方法創建線程安全支持數據的 List
-- Vector
或封裝 ArrayList
和 Collections.synchronizedList()
。java.util.concurrent
包添加了名稱繁瑣的 CopyOnWriteArrayList
。為什么我們想要新的線程安全的 List
類?為什么 Vector
還不夠?
最簡單的答案是與迭代和并發修改之間的交互有關。使用 Vector
或使用同步的 List
封裝器,返回的迭代器是 fail-fast 的,這意味著如果在迭代過程中任何其他線程修改 List,迭代可能失敗。
Vector
的非常普遍的應用程序是存儲通過組件注冊的監聽器的列表。當發生適合的事件時,該組件將在監聽器的列表中迭代,調用每個監聽器。為了防止 ConcurrentModificationException
,迭代線程必須復制列表或鎖定列表,以便進行整體迭代,而這兩種情況都需要大量的性能成本。
CopyOnWriteArrayList
類通過每次添加或刪除元素時創建支持數組的新副本,避免了這個問題,但是進行中的迭代保持對創建迭代器時的當前副本進行操作。雖然復制也會有一些成本,但是在許多情況下,迭代要比修改多得多,在這些情況下,寫入時復制要比其他備用方法具有更好的性能和并發性。
如果應用程序需要 Set
語義,而不是 List
,那么還有一個 Set
版本 -- CopyOnWriteArraySet
。
ConcurrentHashMap
正如已經存在線程安全的 List
的實現,您可以用多種方法創建線程安全的、基于 hash 的 Map
-- Hashtable
,并使用 Collections.synchronizedMap()
封裝 HashMap
。JDK 5.0 添加了 ConcurrentHashMap
實現,該實現提供了相同的基本線程安全的 Map
功能,但它大大提高了并發性。
Hashtable
和 synchronizedMap
所采取的獲得同步的簡單方法(同步 Hashtable
中或者同步的 Map
封裝器對象中的每個方法)有兩個主要的不足。首先,這種方法對于可伸縮性是一種障礙,因為一次只能有一個線程可以訪問 hash 表。同時,這樣仍不足以提供真正的線程安全性,許多公用的混合操作仍然需要額外的同步。雖然諸如 get()
和 put()
之類的簡單操作可以在不需要額外同步的情況下安全地完成,但還是有一些公用的操作序列,例如迭代或者 put-if-absent(空則放入),需要外部的同步,以避免數據爭用。
Hashtable
和 Collections.synchronizedMap
通過同步每個方法獲得線程安全。這意味著當一個線程執行一個 Map
方法時,無論其他線程要對 Map
進行什么樣操作,都不能執行,直到第一個線程結束才可以。
對比來說,ConcurrentHashMap
允許多個讀取幾乎總是并發執行,讀和寫操作通常并發執行,多個同時寫入經常并發執行。結果是當多個線程需要訪問同一 Map
時,可以獲得更高的并發性。
在大多數情況下,ConcurrentHashMap
是 Hashtable
或 Collections.synchronizedMap(new HashMap())
的簡單替換。然而,其中有一個顯著不同,即 ConcurrentHashMap
實例中的同步不鎖定映射進行獨占使用。實際上,沒有辦法鎖定 ConcurrentHashMap
進行獨占使用,它被設計用于進行并發訪問。為了使集合不被鎖定進行獨占使用,還提供了公用的混合操作的其他(原子)方法,如 put-if-absent。ConcurrentHashMap
返回的迭代器是弱一致的,意味著它們將不拋出 ConcurrentModificationException
,將進行"合理操作"來反映迭代過程中其他線程對 Map
的修改。
隊列 |
原始集合框架包含三個接口:List
、Map
和 Set
。List
描述了元素的有序集合,支持完全隨即訪問 -- 可以在任何位置添加、提取或刪除元素。
LinkedList
類經常用于存儲工作元素(等待執行的任務)的列表或隊列。然而,List
提供的靈活性比該公用應用程序所需要的多得多,這個應用程序通常在后面插入元素,從前面刪除元素。但是要支持完整 List 接口則意味著 LinkedList
對于這項任務不像原來那樣有效。Queue
接口比 List
簡單得多,僅包含 put()
和 take()
方法,并允許比 LinkedList
更有效的實現。
Queue
接口還允許實現來確定存儲元素的順序。ConcurrentLinkedQueue
類實現先進先出(first-in-first-out,FIFO)隊列,而 PriorityQueue
類實現優先級隊列(也稱為堆),它對于構建調度器非常有用,調度器必須按優先級或預期的執行時間執行任務。
interface Queue<E> extends Collection<E> {
boolean offer(E x);
E poll();
E remove() throws NoSuchElementException;
E peek();
E element() throws NoSuchElementException;
}
實現 Queue
的類是:
LinkedList
已經進行了改進來實現Queue
。PriorityQueue
非線程安全的優先級對列(堆)實現,根據自然順序或比較器返回元素。ConcurrentLinkedQueue
快速、線程安全的、無阻塞 FIFO 隊列。
任務管理之線程創建 |
線程最普遍的一個應用程序是創建一個或多個線程,以執行特定類型的任務。Timer
類創建線程來執行 TimerTask
對象,Swing 創建線程來處理 UI 事件。在這兩種情況中,在單獨線程中執行的任務都假定是短期的,這些線程是為了處理大量短期任務而存在的。
在其中每種情況中,這些線程一般都有非常簡單的結構:
while (true) {
if (no tasks)
wait for a task;
execute the task;
}
通過例示從 Thread
獲得的對象并調用 Thread.start()
方法來創建線程。可以用兩種方法創建線程:通過擴展 Thread
和覆蓋 run()
方法,或者通過實現 Runnable
接口和使用 Thread(Runnable)
構造函數:
class WorkerThread extends Thread {
public void run() { /* do work */ }
}
Thread t = new WorkerThread();
t.start();
或者:
Thread t = new Thread(new Runnable() {
public void run() { /* do work */ }
}
t.start();
重新使用線程
因為多個原因,類似 Swing GUI 的框架為事件任務創建單一線程,而不是為每項任務創建新的線程。首先是因為創建線程會有間接成本,所以創建線程來執行
簡單任務將是一種資源浪費。通過重新使用事件線程來處理多個事件,啟動和拆卸成本(隨平臺而變)會分攤在多個事件上。
Swing 為事件使用單一后臺線程的另一個原因是確保事件不會互相干涉,因為直到前一事件結束,下一事件才開始處理。該方法簡化了事件處理程序的編寫。
使用多個線程,將要做更多的工作來確保一次僅一個線程地執行線程相關的代碼。
如何不對任務進行管理 |
大多數服務器應用程序(如 Web 服務器、POP 服務器、數據庫服務器或文件服務器)代表遠程客戶機處理請求,這些客戶機通常使用 socket 連接到服務器。對于每個請求,通常要進行少量處理(獲得該文件的代碼塊,并將其發送回 socket),但是可能會有大量(且不受限制)的客戶機請求服務。
用于構建服務器應用程序的簡單化模型會為每個請求創建新的線程。下列代碼段實現簡單的 Web 服務器,它接受端口 80 的 socket 連接,并創建新的線程來處理請求。不幸的是,該代碼不是實現 Web 服務器的好方法,因為在重負載條件下它將失敗,停止整臺服務器。
class UnreliableWebServer {
public static void main(String[] args) {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable r = new Runnable() {
public void run() {
handleRequest(connection);
}
};
// Don't do this!
new Thread(r).start();
}
}
}
當服務器被請求吞沒時,UnreliableWebServer
類不能很好地處理這種情況。每次有請求時,就會創建新的類。根據操作系統和可用內存,可以創建的線程數是有限的。
不幸的是,您通常不知道限制是多少 -- 只有當應用程序因為OutOfMemoryError
而崩潰時才發現。
如果足夠快地在這臺服務器上拋出請求的話,最終其中一個線程創建將失敗,生成的Error
會關閉整個應用程序。當一次僅能有效支持很少線程時,沒有必要創建上千個
線程,無論如何,這樣使用資源可能會損害性能。創建線程會使用相當一部分內存,其中包括有兩個堆棧(Java 和 C),以及每線程數據結構。如果創建過多線程,其中
每個線程都將占用一些 CPU 時間,結果將使用許多內存來支持大量線程,每個線程都運行得很慢。這樣就無法很好地使用計算資源。
使用線程池解決問題
為任務創建新的線程并不一定不好,但是如果創建任務的頻率高,而平均任務持續時間低,我們可以看到每項任務創建一個新的線程將產生性能(如果負載不可預知,還
有穩定性)問題。
如果不是每項任務創建一個新的線程,則服務器應用程序必須采取一些方法來限制一次可以處理的請求數。這意味著每次需要啟動新的任務時,它不能僅調用下列代碼。
new Thread(runnable).start()
管理一大組小任務的標準機制是組合工作隊列和線程池。工作隊列就是要處理的任務的隊列,前面描述的Queue
類完全適合。線程池是線程的集合,每個線程都提取公用
工作隊列。當一個工作線程完成任務處理后,它會返回隊列,查看是否有其他任務需要處理。如果有,它會轉移到下一個任務,并開始處理。
線程池為線程生命周期間接成本問題和資源崩潰問題提供了解決方案。通過對多個任務重新使用線程,創建線程的間接成本將分布到多個任務中。作為一種額外好處,因為
請求到達時,線程已經存在,從而可以消除由創建線程引起的延遲。因此,可以立即處理請求,使應用程序更易響應。而且,通過正確調整線程池中的線程數,可以強制超
出特定限制的任何請求等待,直到有線程可以處理它,它們等待時所消耗的資源要少于使用額外線程所消耗的資源,這樣可以防止資源崩潰。
Executor 框架java.util.concurrent
包中包含靈活的線程池實現,但是更重要的是,它包含用于管理實現Runnable
的任務的執行的整個框架。該框架稱為 Executor 框架。Executor
接口相當簡單。它描述將運行Runnable
的對象:
public interface Executor {
void execute(Runnable command);
}
任務運行于哪個線程不是由該接口指定的,這取決于使用的Executor
的實現。它可以運行于后臺線程,如 Swing 事件線程,或者運行于線程池,或者調用線程,或者
新的線程,它甚至可以運行于其他 JVM!通過同步的Executor
接口提交任務,從任務執行策略中刪除任務提交。Executor
接口獨自關注任務提交 -- 這是Executor
實現的選擇,確定執行策略。這使在部署時調整執行策略(隊列限制、池大小、優先級排列等等)更加容易,更改的代碼最少。java.util.concurrent
中的大多數Executor
實現還實現ExecutorService
接口,這是對Executor
的擴展,它還管理執行服務的生命周期。這使它們更易
于
管理,并向生命可能比單獨Executor
的生命更長的應用程序提供服務。
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout,
TimeUnit unit);
// other convenience methods for submitting tasks
}
Executor
java.util.concurrent
包包含多個Executor
實現,每個實現都實現不同的執行策略。什么是執行策略?執行策略定義何時在哪個線程中運行任務,
執行任務可能消耗的資源級別(線程、內存等等),以及如果執行程序超載該怎么辦。
執行程序通常通過工廠方法例示,而不是通過構造函數。Executors
類包含用于構造許多不同類型的Executor
實現的靜態工廠方法:
Executors.newCachedThreadPool()
創建不限制大小的線程池,但是當以前創建的線程可以使用時將重新使用那些線程。如果沒有現有線程可用,- 將創建新的線程并將其添加到池中。使用不到 60 秒的線程將終止并從緩存中刪除。
Executors.newFixedThreadPool(int n)
創建線程池,其重新使用在不受限制的隊列之外運行的固定線程組。在關閉前,所有線程都會因為執行- 過程中的失敗而終止,如果需要執行后續任務,將會有新的線程來代替這些線程。
Executors.newSingleThreadExecutor()
創建 Executor,其使用在不受限制的隊列之外運行的單一工作線程,與 Swing 事件線程非常相似。- 保證順序執行任務,在任何給定時間,不會有多個任務處于活動狀態。
更可靠的 Web 服務器 -- 使用 Executor
前面 如何不對任務進行管理 中的代碼顯示了如何不用編寫可靠服務器應用程序。幸運的是,修復這個示例非常簡單,只需將 Thread.start()
調用替換
為向 Executor
提交任務即可:
class ReliableWebServer { Executor pool = Executors.newFixedThreadPool(7); public static void main(String[] args) { ServerSocket socket = new ServerSocket(80); while (true) { final Socket connection = socket.accept(); Runnable r = new Runnable() { public void run() { handleRequest(connection); } }; pool.execute(r); } } }
注意,本例與前例之間的區別僅在于Executor
的創建以及如何提交執行的任務。
定制 ThreadPoolExecutor
Executors
中的newFixedThreadPool
和newCachedThreadPool
工廠方法返回的Executor
是類ThreadPoolExecutor
的實例,是高度可定制的。
通過使用包含ThreadFactory
變量的工廠方法或構造函數的版本,可以定義池線程的創建。ThreadFactory
是工廠對象,其構造執行程序要使用的新線程。
使用定制的線程工廠,創建的線程可以包含有用的線程名稱,并且這些線程是守護線程,屬于特定線程組或具有特定優先級。
下面是線程工廠的例子,它創建守護線程,而不是創建用戶線程:
public class DaemonThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
return thread;
}
}
有時,Executor
不能執行任務,因為它已經關閉或者因為Executor
使用受限制隊列存儲等待任務,而該隊列已滿。在這種情況下,需要咨詢執行程序的RejectedExecutionHandler
來確定如何處理任務 -- 拋出異常(默認情況),放棄任務,在調用者的線程中執行任務,或放棄隊列中最早的任務以為
新任務騰出空間。ThreadPoolExecutor.setRejectedExecutionHandler
可以設置拒絕的執行處理程序。
還可以擴展ThreadPoolExecutor
,并覆蓋方法beforeExecute
和afterExecute
,以添加裝置,添加記錄,添加計時,重新初始化線程本地變量,
或進行其他執行定制。
需要特別考慮的問題
使用Executor
框架會從執行策略中刪除任務提交,一般情況下,人們希望這樣,那是因為它允許我們靈活地調整執行策略,不必更改許多位置的代碼。然而,當提交代
碼暗含假設特定執行策略時,存在多種情況,在這些情況下,
重要的是選擇的 Executor 實現一致的執行策略。
這類情況中的其中的一種就是一些任務同時等待其他任務完成。在這種情況下,當線程池沒有足夠的線程時,如果所有當前執行的任務都在等待另一項任務,而該任務因為
線程池已滿不能執行,那么線程池可能會死鎖。
另一種相似的情況是一組線程必須作為共同操作組一起工作。在這種情況下,需要確保線程池能夠容納所有線程。
如果應用程序對特定執行程序進行了特定假設,那么應該在Executor
定義和初始化的附近對這些進行說明,從而使善意的更改不會破壞應用程序的正確功能。
調整線程池 |
創建Executor
時,人們普遍會問的一個問題是"線程池應該有多大?"。當然,答案取決于硬件和將執行的任務類型(它們是受計算限制或是受 IO 的限制?)。
如果線程池太小,資源可能不能被充分利用,在一些任務還在工作隊列中等待執行時,可能會有處理器處于閑置狀態。
另一方面,如果線程池太大,則將有許多有效線程,因為大量線程或有效任務使用內存,或者因為每項任務要比使用少量線程有更多上下文切換,性能可能會受損。
所以假設為了使處理器得到充分使用,線程池應該有多大?如果知道系統有多少處理器和任務的計算時間和等待時間的近似比率,Amdahl 法則提供很好的近似公式。
用 WT 表示每項任務的平均等待時間,ST 表示每項任務的平均服務時間(計算時間)。則 WT/ST 是每項任務等待所用時間的百分比。對于 N 處理器系統,池中可以
近似有 N*(1+WT/ST) 個線程。
好的消息是您不必精確估計 WT/ST。"合適的"池大小的范圍相當大;只需要避免"過大"和"過小"的極端情況即可。
Future 接口
Future
接口允許表示已經完成的任務、正在執行過程中的任務或者尚未開始執行的任務。通過Future
接口,可以嘗試取消尚未完成的任務,查詢任務已經完成還是
取消了,以及提取(或等待)任務的結果值。FutureTask
類實現了Future
,并包含一些構造函數,允許將Runnable
或Callable
(會產生結果的Runnable
)和Future
接口封裝。因為FutureTask
也實現Runnable
,所以可以只將FutureTask
提供給Executor
。一些提交方法(如ExecutorService.submit()
)除了提交任務之外,還將返回Future
接口。Future.get()
方法檢索任務計算的結果(或如果任務完成,但有異常,則拋出ExecutionException
)。如果任務尚未完成,那么Future.get()
將被阻塞,
直到任務完成;如果任務已經完成,那么它將立即返回結果。
使用 Future 構建緩存
該示例代碼與java.util.concurrent
中的多個類關聯,突出顯示了Future
的功能。它實現緩存,使用Future
描述緩存值,該值可能已經計算,或者可能在其他線程中"正在構造"。
它利用ConcurrentHashMap
中的原子putIfAbsent()
方法,確保僅有一個線程試圖計算給定關鍵字的值。如果其他線程隨后請求同一關鍵字的值,它僅能等待(通過Future.get()
的幫助)第一個線程完成。因此兩個線程不會計算相同的值。
public class Cache<K, V> {
ConcurrentMap<K, FutureTask<V>> map = new ConcurrentHashMap();
Executor executor = Executors.newFixedThreadPool(8);
public V get(final K key) {
FutureTask<V> f = map.get(key);
if (f == null) {
Callable<V> c = new Callable<V>() {
public V call() {
// return value associated with key
}
};
f = new FutureTask<V>(c);
FutureTask old = map.putIfAbsent(key, f);
if (old == null)
executor.execute(f);
else
f = old;
}
return f.get();
}
}
CompletionService
CompletionService
將執行服務與類似Queue
的接口組合,從任務執行中刪除任務結果的處理。CompletionService
接口包含用來提交將要執行的任務的submit()
方法和用來詢問下一完成任務的take()
/poll()
方法。CompletionService
允許應用程序結構化,使用 Producer/Consumer 模式,其中生產者創建任務并提交,消費者請求完成任務的結果并處理這些結果。CompletionService
接口由ExecutorCompletionService
類實現,該類使用Executor
處理任務并從CompletionService
導出 submit/poll/take 方法。
下列代碼使用Executor
和CompletionService
來啟動許多"solver"任務,并使用第一個生成非空結果的任務的結果,然后取消其余任務:
void solve(Executor e, Collection<Callable<Result>> solvers)
throws InterruptedException {
CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
int n = solvers.size();
List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
Result result = null;
try {
for (Callable<Result> s : solvers)
futures.add(ecs.submit(s));
for (int i = 0; i < n; ++i) {
try {
Result r = ecs.take().get();
if (r != null) {
result = r;
break;
}
} catch(ExecutionException ignore) {}
}
}
finally {
for (Future<Result> f : futures)
f.cancel(true);
}
if (result != null)
use(result);
}
中其他類別的有用的類也是同步工具。這組類相互協作,控制一個或多個線程的執行流。
java.util.concurrent
Semaphore
、CyclicBarrier
、CountdownLatch
和Exchanger
類都是同步工具的例子。每個類都有線程可以調用的方法,
方法是否被阻塞取決于正在使用的特定同步工具的狀態和規則。
Semaphore
Semaphore 類實現標準 Dijkstra 計數信號。計數信號可以認為具有一定數量的許可權,該許可權可以獲得或釋放。如果有剩余的許可權,
acquire()
方法將成功,
否則該方法將被阻塞,直到有可用的許可權(通過其他線程釋放許可權)。線程一次可以獲得多個許可權。
計數信號可以用于限制有權對資源進行并發訪問的線程數。該方法對于實現資源池或限制 Web 爬蟲(Web crawler)中的輸出 socket 連接非常有用。
注意信號不跟蹤哪個線程擁有多少許可權;這由應用程序來決定,以確保何時線程釋放許可權,該信號表示其他線程擁有許可權或者正在釋放許可權,以及其他線程知道
它的許可權已釋放。
互斥
計數信號的一種特殊情況是互斥,或者互斥信號?;コ饩褪蔷哂袉我辉S可權的計數信號,意味著在給定時間僅一個線程可以具有許可權(也稱為二進制信號)?;コ饪梢?br/>用于管理對共享資源的獨占訪問。
雖然互斥許多地方與鎖定一樣,但互斥還有一個鎖定通常沒有的其他功能,就是互斥可以由具有許可權的線程之外的其他線程來釋放。這在死鎖恢復時會非常有用。
CyclicBarrier 類可以幫助同步,它允許一組線程等待整個線程組到達公共屏障點。
CyclicBarrier
是使用整型變量構造的,其確定組中的線程數。當一個線程到達屏障時(通過調用CyclicBarrier.await()
),它會被阻塞,直到所有線程都到達屏障,然后在該點允許所有線程繼續執行。該操作與許多家庭逛商業街相似 -- 每個家庭成員都自己走,并商定 1:00 在電影院集合。當您到電影院但不是所有人都到了時,您會坐下來等其他人到達。然后所有人一起離開。
認為屏障是循環的是因為它可以重新使用;一旦所有線程都已經在屏障處集合并釋放,則可以將該屏障重新初始化到它的初始狀態。
還可以指定在屏障處等待時的超時;如果在該時間內其余線程還沒有到達屏障,則認為屏障被打破,所有正在等待的線程會收到BrokenBarrierException
。
下列代碼將創建CyclicBarrier
并啟動一組線程,每個線程將計算問題的一部分,等待所有其他線程結束之后,再檢查解決方案是否達成一致。如果不一致,那么每個工作線程將開始另一個迭代。該例將使用CyclicBarrier
變量,它允許注冊Runnable
,在所有線程到達屏障但還沒有釋放任何線程時執行Runnable
。
class Solver { // Code sketch
void solve(final Problem p, int nThreads) {
final CyclicBarrier barrier =
new CyclicBarrier(nThreads,
new Runnable() {
public void run() { p.checkConvergence(); }}
);
for (int i = 0; i < nThreads; ++i) {
final int id = i;
Runnable worker = new Runnable() {
final Segment segment = p.createSegment(id);
public void run() {
try {
while (!p.converged()) {
segment.update();
barrier.await();
}
}
catch(Exception e) { return; }
}
};
new Thread(worker).start();
}
}
CountdownLatch |
CountdownLatch
類與CyclicBarrier
相似,因為它的角色是對已經在它們中間分攤了問題的一組線程進行協調。它也是使用整型變量構造的,指明計數的初始值,但是與CyclicBarrier
不同的是,CountdownLatch
不能重新使用。
其中,CyclicBarrier
是到達屏障的所有線程的大門,只有當所有線程都已經到達屏障或屏障被打破時,才允許這些線程通過,CountdownLatch
將到達和等待功能分離。任何線程都可以通過調用countDown()
減少當前計數,這種不會阻塞線程,而只是減少計數。await() 方法的行為與CyclicBarrier.await()
稍微有所不同,調用await()
任何線程都會被阻塞,直到閂鎖計數減少為零,在該點等待的所有線程才被釋放,對await()
的后續調用將立即返回。
當問題已經分解為許多部分,每個線程都被分配一部分計算時,CountdownLatch
非常有用。在工作線程結束時,它們將減少計數,協調線程可以在閂鎖處等待當前這一批計算結束,然后繼續移至下一批計算。
相反地,具有計數 1 的CountdownLatch
類可以用作"啟動大門",來立即啟動一組線程;工作線程可以在閂鎖處等待,協調線程減少計數,從而立即釋放所有工作線程。下例使用兩個CountdownLatche
。一個作為啟動大門,一個在所有工作線程結束時釋放線程:
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();
doSomethingElse(); // don't let them run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
}
Exchanger
類方便了兩個共同操作線程之間的雙向交換;這樣,就像具有計數為 2 的CyclicBarrier
,并且兩個線程在都到達屏障時可以"交換"一些狀態。(Exchanger
模式有時也稱為聚集。)
Exchanger
通常用于一個線程填充緩沖(通過讀取 socket),而另一個線程清空緩沖(通過處理從 socket 收到的命令)的情況。當兩個線程在屏障處集合時,它們交換緩沖。下列代碼說明了這項技術:
class FillAndEmpty {
Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
DataBuffer initialEmptyBuffer = new DataBuffer();
DataBuffer initialFullBuffer = new DataBuffer();
class FillingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialEmptyBuffer;
try {
while (currentBuffer != null) {
addToBuffer(currentBuffer);
if (currentBuffer.full())
currentBuffer = exchanger.exchange(currentBuffer);
}
} catch (InterruptedException ex) { ... handle ... }
}
}
class EmptyingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialFullBuffer;
try {
while (currentBuffer != null) {
takeFromBuffer(currentBuffer);
if (currentBuffer.empty())
currentBuffer = exchanger.exchange(currentBuffer);
}
} catch (InterruptedException ex) { ... handle ...}
}
}
void start() {
new Thread(new FillingLoop()).start();
new Thread(new EmptyingLoop()).start();
}
}
鎖定和原子之Lock
Java 語言內置了鎖定工具 -- synchronized
關鍵字。當線程獲得監視器時(內置鎖定),其他線程如果試圖獲得相同鎖定,那么它們將被阻塞,直到第一個線程釋放該鎖定。同步還確保隨后獲得相同鎖定的線程可以看到之前的線程在具有該鎖定時所修改的變量的值,從而確保如果類正確地同步了共享狀態的訪問權,那么線程將不會看到變量的"失效"值,這是緩存或編譯器優化的結果。
雖然同步沒有什么問題,但它有一些限制,在一些高級應用程序中會造成不便。Lock
接口將內置監視器鎖定的鎖定行為普遍化,允許多個鎖定實現,同時提供一些內置鎖定缺少的功能,如計時的等待、可中斷的等待、鎖定輪詢、每個鎖定有多個條件等待集合以及無阻塞結構的鎖定。
interface Lock {
void lock();
void lockInterruptibly() throws IE;
boolean tryLock();
boolean tryLock(long time,
TimeUnit unit) throws IE;
void unlock();
Condition newCondition() throws
UnsupportedOperationException;
}
ReentrantLock
ReentrantLock
是具有與隱式監視器鎖定(使用 synchronized 方法和語句訪問)相同的基本行為和語義的 Lock 的實現,但它具有擴展的能力。
作為額外收獲,在競爭條件下,ReentrantLock
的實現要比現在的 synchronized 實現更具有可伸縮性。(有可能在 JVM 的將來版本中改進 synchronized 的競爭性能。)
這意味著當許多線程都競爭相同鎖定時,使用ReentrantLock
的吞吐量通常要比synchronized
好。換句話說,當許多線程試圖訪問ReentrantLock
保護的共享資源時,
JVM 將花費較少的時間來調度線程,而用更多個時間執行線程。
雖然ReentrantLock
類有許多優點,但是與同步相比,它有一個主要缺點 -- 它可能忘記釋放鎖定。建議當獲得和釋放ReentrantLock
時使用下列結構:
Lock lock = new ReentrantLock();
...
lock.lock();
try {
// perform operations protected by lock
}
catch(Exception ex) {
// restore invariants
}
finally {
lock.unlock();
}
因為鎖定失誤(忘記釋放鎖定)的風險,所以對于基本鎖定,強烈建議您繼續使用synchronized
,除非真的需要ReentrantLock
額外的靈活性和可伸縮性。
ReentrantLock
是用于高級應用程序的高級工具 -- 有時需要,但有時用原來的方法就很好。
Condition
就像Lock
接口是同步的具體化,Condition
接口是Object
中wait()
和notify()
方法的具體化。Lock
中的一個方法是newCondition()
,
它要求鎖定向該鎖定返回新的Condition
對象限制。await()
、signal()
和signalAll()
方法類似于wait()
、notify()
和notifyAll()
,
但增加了靈活性,每個Lock
都可以創建多個條件變量。這簡化了一些并發算法的實現。
ReadWriteLockReentrantLock
實現的鎖定規則非常簡單 -- 每當一個線程具有鎖定時,其他線程必須等待,直到該鎖定可用。有時,當對數據結構的讀取通常多于修改時,
可以使用更復雜的稱為讀寫鎖定的鎖定結構,它允許有多個并發讀者,同時還允許一個寫入者獨占鎖定。該方法在一般情況下(只讀)提供了更大的并發性,同時
在必要時仍提供獨占訪問的安全性。ReadWriteLock
接口和ReentrantReadWriteLock
類提供這種功能 -- 多讀者、單寫入者鎖定規則,可以用這種功能
來保護共享的易變資源。
原子變量
即使大多數用戶將很少直接使用它們,原子變量類(AtomicInteger
、AtomicLong
、AtomicReference
等等)也有充分理由是最顯著的新并發類。這些類公開對
JVM 的低級別改進,允許進行具有高度可伸縮性的原子讀-修改-寫操作。大多數現代 CPU 都有原子讀-修改-寫的原語,比如比較并交換(CAS)或加載鏈接/條件存儲
(LL/SC)。原子變量類使用硬件提供的最快的并發結構來實現。
許多并發算法都是根據對計數器或數據結構的比較并交換操作來定義的。通過暴露高性能的、高度可伸縮的 CAS 操作(以原子變量的形式),用 Java 語言實現高性能、
無等待、無鎖定的并發算法已經變得可行。
幾乎java.util.concurrent
中的所有類都是在ReentrantLock
之上構建的,ReentrantLock
則是在原子變量類的基礎上構建的。所以,雖然僅少數并發專家
使用原子變量類,但java.util.concurrent
類的很多可伸縮性改進都是由它們提供的。
原子變量主要用于為原子地更新"熱"字段提供有效的、細粒度的方式,"熱"字段是指由多個線程頻繁訪問和更新的字段。另外,原子變量還是計數器或生成序號的自然
機制。
性能與可伸縮性
雖然java.util.concurrent
努力的首要目標是使編寫正確、線程安全的類更加容易,但它還有一個次要目標,就是提供可伸縮性??缮炜s性與性能完全不同,實際上,
可伸縮性有時要以性能為代價來獲得。
性能是"可以快速執行此任務的程度"的評測??缮炜s性描述應用程序的吞吐量如何表現為它的工作量和可用計算資源增加。可伸縮的程序可以按比例使用更多的處理器、
內存或 I/O 帶寬來處理更多個工作量。當我們在并發環境中談論可伸縮性時,我們是在問當許多線程同時訪問給定類時,這個類的執行情況。java.util.concurrent
中的低級別類ReentrantLock
和原子變量類的可伸縮性要比內置監視器(同步)鎖定高得多。因此,使用ReentrantLock
或原子變量
類來協調共享訪問的類也可能更具有可伸縮性。
Hashtable 與 ConcurrentHashMap
作為可伸縮性的例子,ConcurrentHashMap
實現設計的可伸縮性要比其線程安全的上一代Hashtable
的可伸縮性強得多。Hashtable
一次只允許一個線程訪問Map
;ConcurrentHashMap
允許多個讀者并發執行,讀者與寫入者并發執行,以及一些寫入者并發執行。因此,如果許多線程頻繁訪問共享映射,使用ConcurrentHashMap
的總的吞吐量要比使用Hashtable
的好。
下表大致說明了Hashtable
和ConcurrentHashMap
之間的可伸縮性差別。在每次運行時,N 個線程并發執行緊密循環,它們從Hashtable
或ConcurrentHashMap
中檢索隨即關鍵字,60% 的失敗檢索將執行put()
操作,2% 的成功檢索執行remove()
操作。測試在運行 Linux 的雙處理器
Xeon 系統中執行。數據顯示 10,000,000 個迭代的運行時間,對于ConcurrentHashMap
,標準化為一個線程的情況??梢钥吹街钡皆S多線程,ConcurrentHashMap
的性能仍保持可伸縮性,而Hashtable
的性能在出現鎖定競爭時幾乎立即下降。
與通常的服務器應用程序相比,這個測試中的線程數看起來很少。然而,因為每個線程未進行其他操作,僅是重復地選擇使用該表,所以這樣可以模擬在執行
一些實際工作的情況下使用該表的大量線程的競爭。
線程 | ConcurrentHashMap | Hashtable |
---|---|---|
1 | 1.0 | 1.51 |
2 | 1.44 | 17.09 |
4 | 1.83 | 29.9 |
8 | 4.06 | 54.06 |
16 | 7.5 | 119.44 |
32 | 15.32 | 237.2 |
Lock 與 synchronized 與原子
下列基準說明了使用java.util.concurrent
可能改進可伸縮性的例子。該基準將模擬旋轉骰子,使用線性同余隨機數生成器。有三個可用的隨機數生成器的實現:一個使用同步來管理生成器的狀態(單一變量),一個使用ReentrantLock
,另一個則使用AtomicLong
。下圖顯示了在 8-way Ultrasparc3 系統上,逐漸增加線程數量時這三個版本的相對吞吐量。(該圖對原子變量方法的可伸縮性描述比較保守。)
圖 1. 使用同步、Lock 和 AtomicLong 的相對吞吐量
公平與不公平
java.util.concurrent
中許多類中的另外一個定制元素是"公平"的問題。公平鎖定或公平信號是指在其中根據先進先出(FIFO)的原則給與線程鎖定或信號。ReentrantLock
、Semaphore
和ReentrantReadWriteLock
的構造函數都可以使用變量確定鎖定是否公平,或者是否允許闖入(線程獲得鎖定,即使它們等待的時間不是最長)。
雖然闖入鎖定的想法可能有些可笑,但實際上不公平、闖入的鎖定非常普遍,且通常很受歡迎。使用同步訪問的內置鎖定不是公平鎖定(且沒有辦法使它們公平)。相反,它們提供較弱的生病保證,要求所有線程最終都將獲得鎖定。
大多數應用程序選擇(且應該選擇)闖入鎖定而不是公平鎖定的原因是性能。在大多數情況下,完全的公平不是程序正確性的要求,真正公平的成本相當高。下表向前面的面板中的表中添加了第四個數據集,并由一個公平鎖定管理對 PRNG 狀態的訪問。注意闖入鎖定與公平鎖定之間吞吐量的巨大差別。
圖 2. 使用同步、Lock、公平鎖定和 AtomicLong 的相對吞吐量
結束語
java.util.concurrent
包中包含大量有用的構建快,可以用它們來改進并發類的性能、可伸縮性、線程安全和可維護性。
通過這些構建快,應該可以不再需要在您的代碼中大量使用同步、wait/notify 和Thread.start()
,而用更高級別、標準化的、
高性能并發實用程序來替換它們。
Exchanger |
CyclicBarrier |
Synchronizer |
凡是有該標志的文章,都是該blog博主Caoer(草兒)原創,凡是索引、收藏
、轉載請注明來處和原文作者。非常感謝。