java多線(xiàn)程設(shè)計(jì)模式詳解之二
Posted on 2007-11-09 01:40 dybjsun 閱讀(213) 評(píng)論(0) 編輯 收藏 所屬分類(lèi): 多線(xiàn)程主題
wait()/notify()
通常,多線(xiàn)程之間需要協(xié)調(diào)工作。例如,瀏覽器的一個(gè)顯示圖片的線(xiàn)程displayThread想要執(zhí)行顯示圖片的任務(wù),必須等待下載線(xiàn)程downloadThread將該圖片下載完畢。如果圖片還沒(méi)有下載完,displayThread可以暫停,當(dāng)downloadThread完成了任務(wù)后,再通知displayThread“圖片準(zhǔn)備完畢,可以顯示了”,這時(shí),displayThread繼續(xù)執(zhí)行。
以上邏輯簡(jiǎn)單的說(shuō)就是:如果條件不滿(mǎn)足,則等待。當(dāng)條件滿(mǎn)足了,就執(zhí)行下面的程序,這個(gè)機(jī)制的實(shí)現(xiàn)依賴(lài)于wait/notify。等待機(jī)制與鎖機(jī)制是密切關(guān)聯(lián)的。例如:
synchronized(obj) {
while(!condition) {
obj.wait();
}
obj.doSomething();
}
當(dāng)線(xiàn)程A獲得了obj鎖后,發(fā)現(xiàn)條件condition不滿(mǎn)足,無(wú)法繼續(xù)下一處理,于是線(xiàn)程A就wait()。
在另一線(xiàn)程B中,如果B更改了某些條件,使得線(xiàn)程A的condition條件滿(mǎn)足了,就可以喚醒線(xiàn)程A:
synchronized(obj) {
condition = true;
obj.notify();
}
需要注意的概念是:
# 調(diào)用obj的wait(), notify()方法前,必須獲得obj鎖,也就是必須寫(xiě)在synchronized(obj) {...} 代碼段內(nèi)。
# 調(diào)用obj.wait()后,線(xiàn)程A就釋放了obj的鎖,否則線(xiàn)程B無(wú)法獲得obj鎖,也就無(wú)法在synchronized(obj) {...} 代碼段內(nèi)喚醒A。
# 當(dāng)obj.wait()方法返回后,線(xiàn)程A需要再次獲得obj鎖,才能繼續(xù)執(zhí)行。
# 如果A1,A2,A3都在obj.wait(),則B調(diào)用obj.notify()只能喚醒A1,A2,A3中的一個(gè)(具體哪一個(gè)由JVM決定)。
# obj.notifyAll()則能全部喚醒A1,A2,A3,但是要繼續(xù)執(zhí)行obj.wait()的下一條語(yǔ)句,必須獲得obj鎖,因此,A1,A2,A3只有一個(gè)有機(jī)會(huì)獲得鎖繼續(xù)執(zhí)行,例如A1,其余的需要等待A1釋放obj鎖之后才能繼續(xù)執(zhí)行。
# 當(dāng)B調(diào)用obj.notify/notifyAll的時(shí)候,B正持有obj鎖,因此,A1,A2,A3雖被喚醒,但是仍無(wú)法獲得obj鎖。直到B退出synchronized塊,釋放obj鎖后,A1,A2,A3中的一個(gè)才有機(jī)會(huì)獲得鎖繼續(xù)執(zhí)行。
wait()/sleep()的區(qū)別
前面講了wait/notify機(jī)制,Thread還有一個(gè)sleep()靜態(tài)方法,它也能使線(xiàn)程暫停一段時(shí)間。sleep與wait的不同點(diǎn)是:sleep并不釋放鎖,并且sleep的暫停和wait暫停是不一樣的。obj.wait會(huì)使線(xiàn)程進(jìn)入obj對(duì)象的等待集合中并等待喚醒。
但是wait()和sleep()都可以通過(guò)interrupt()方法打斷線(xiàn)程的暫停狀態(tài),從而使線(xiàn)程立刻拋出InterruptedException。
如果線(xiàn)程A希望立即結(jié)束線(xiàn)程B,則可以對(duì)線(xiàn)程B對(duì)應(yīng)的Thread實(shí)例調(diào)用interrupt方法。如果此刻線(xiàn)程B正在wait/sleep/join,則線(xiàn)程B會(huì)立刻拋出InterruptedException,在catch() {} 中直接return即可安全地結(jié)束線(xiàn)程。
需要注意的是,InterruptedException是線(xiàn)程自己從內(nèi)部拋出的,并不是interrupt()方法拋出的。對(duì)某一線(xiàn)程調(diào)用interrupt()時(shí),如果該線(xiàn)程正在執(zhí)行普通的代碼,那么該線(xiàn)程根本就不會(huì)拋出InterruptedException。但是,一旦該線(xiàn)程進(jìn)入到wait()/sleep()/join()后,就會(huì)立刻拋出InterruptedException。
GuardedSuspention
GuardedSuspention模式主要思想是:
當(dāng)條件不滿(mǎn)足時(shí),線(xiàn)程等待,直到條件滿(mǎn)足時(shí),等待該條件的線(xiàn)程被喚醒。
我們?cè)O(shè)計(jì)一個(gè)客戶(hù)端線(xiàn)程和一個(gè)服務(wù)器線(xiàn)程,客戶(hù)端線(xiàn)程不斷發(fā)送請(qǐng)求給服務(wù)器線(xiàn)程,服務(wù)器線(xiàn)程不斷處理請(qǐng)求。當(dāng)請(qǐng)求隊(duì)列為空時(shí),服務(wù)器線(xiàn)程就必須等待,直到客戶(hù)端發(fā)送了請(qǐng)求。
先定義一個(gè)請(qǐng)求隊(duì)列:Queue
package com.crackj2ee.thread;
import java.util.*;
public class Queue {
private List queue = new LinkedList();
public synchronized Request getRequest() {
while(queue.size()==0) {
try {
this.wait();
}
catch(InterruptedException ie) {
return null;
}
}
return (Request)queue.remove(0);
}
public synchronized void putRequest(Request request) {
queue.add(request);
this.notifyAll();
}
}
藍(lán)色部分就是服務(wù)器線(xiàn)程的等待條件,而客戶(hù)端線(xiàn)程在放入了一個(gè)request后,就使服務(wù)器線(xiàn)程等待條件滿(mǎn)足,于是喚醒服務(wù)器線(xiàn)程。
客戶(hù)端線(xiàn)程:ClientThread
package com.crackj2ee.thread;
public class ClientThread extends Thread {
private Queue queue;
private String clientName;
public ClientThread(Queue queue, String clientName) {
this.queue = queue;
this.clientName = clientName;
}
public String toString() {
return "[ClientThread-" + clientName + "]";
}
public void run() {
for(int i=0; i<100; i++) {
Request request = new Request("" + (long)(Math.random()*10000));
System.out.println(this + " send request: " + request);
queue.putRequest(request);
try {
Thread.sleep((long)(Math.random() * 10000 + 1000));
}
catch(InterruptedException ie) {
}
}
System.out.println(this + " shutdown.");
}
}
服務(wù)器線(xiàn)程:ServerThread
package com.crackj2ee.thread;
public class ServerThread extends Thread {
private boolean stop = false;
private Queue queue;
public ServerThread(Queue queue) {
this.queue = queue;
}
public void shutdown() {
stop = true;
this.interrupt();
try {
this.join();
}
catch(InterruptedException ie) {}
}
public void run() {
while(!stop) {
Request request = queue.getRequest();
System.out.println("[ServerThread] handle request: " + request);
try {
Thread.sleep(2000);
}
catch(InterruptedException ie) {}
}
System.out.println("[ServerThread] shutdown.");
}
}
服務(wù)器線(xiàn)程在紅色部分可能會(huì)阻塞,也就是說(shuō),Queue.getRequest是一個(gè)阻塞方法。這和java標(biāo)準(zhǔn)庫(kù)的許多IO方法類(lèi)似。
最后,寫(xiě)一個(gè)Main來(lái)啟動(dòng)他們:
package com.crackj2ee.thread;
public class Main {
public static void main(String[] args) {
Queue queue = new Queue();
ServerThread server = new ServerThread(queue);
server.start();
ClientThread[] clients = new ClientThread[5];
for(int i=0; i<clients.length; i++) {
clients[i] = new ClientThread(queue, ""+i);
clients[i].start();
}
try {
Thread.sleep(100000);
}
catch(InterruptedException ie) {}
server.shutdown();
}
}
我們啟動(dòng)了5個(gè)客戶(hù)端線(xiàn)程和一個(gè)服務(wù)器線(xiàn)程,運(yùn)行結(jié)果如下:
[ClientThread-0] send request: Request-4984
[ServerThread] handle request: Request-4984
[ClientThread-1] send request: Request-2020
[ClientThread-2] send request: Request-8980
[ClientThread-3] send request: Request-5044
[ClientThread-4] send request: Request-548
[ClientThread-4] send request: Request-6832
[ServerThread] handle request: Request-2020
[ServerThread] handle request: Request-8980
[ServerThread] handle request: Request-5044
[ServerThread] handle request: Request-548
[ClientThread-4] send request: Request-1681
[ClientThread-0] send request: Request-7859
[ClientThread-3] send request: Request-3926
[ServerThread] handle request: Request-6832
[ClientThread-2] send request: Request-9906
......
可以觀察到ServerThread處理來(lái)自不同客戶(hù)端的請(qǐng)求。
思考
Q: 服務(wù)器線(xiàn)程的wait條件while(queue.size()==0)能否換成if(queue.size()==0)?
A: 在這個(gè)例子中可以,因?yàn)榉?wù)器線(xiàn)程只有一個(gè)。但是,如果服務(wù)器線(xiàn)程有多個(gè)(例如Web應(yīng)用程序有多個(gè)線(xiàn)程處理并發(fā)請(qǐng)求,這非常普遍),就會(huì)造成嚴(yán)重問(wèn)題。
Q: 能否用sleep(1000)代替wait()?
A: 絕對(duì)不可以。sleep()不會(huì)釋放鎖,因此sleep期間別的線(xiàn)程根本沒(méi)有辦法調(diào)用getRequest()和putRequest(),導(dǎo)致所有相關(guān)線(xiàn)程都被阻塞。
Q: (Request)queue.remove(0)可以放到synchronized() {}塊外面嗎?
A: 不可以。因?yàn)閣hile()是測(cè)試queue,remove()是使用queue,兩者是一個(gè)原子操作,不能放在synchronized外面。
總結(jié)
多線(xiàn)程設(shè)計(jì)看似簡(jiǎn)單,實(shí)際上必須非常仔細(xì)地考慮各種鎖定/同步的條件,稍不小心,就可能出錯(cuò)。并且,當(dāng)線(xiàn)程較少時(shí),很可能發(fā)現(xiàn)不了問(wèn)題,一旦問(wèn)題出現(xiàn)又難以調(diào)試。
所幸的是,已有一些被驗(yàn)證過(guò)的模式可以供我們使用,我們會(huì)繼續(xù)介紹一些常用的多線(xiàn)程設(shè)計(jì)模式。
通常,多線(xiàn)程之間需要協(xié)調(diào)工作。例如,瀏覽器的一個(gè)顯示圖片的線(xiàn)程displayThread想要執(zhí)行顯示圖片的任務(wù),必須等待下載線(xiàn)程downloadThread將該圖片下載完畢。如果圖片還沒(méi)有下載完,displayThread可以暫停,當(dāng)downloadThread完成了任務(wù)后,再通知displayThread“圖片準(zhǔn)備完畢,可以顯示了”,這時(shí),displayThread繼續(xù)執(zhí)行。
以上邏輯簡(jiǎn)單的說(shuō)就是:如果條件不滿(mǎn)足,則等待。當(dāng)條件滿(mǎn)足了,就執(zhí)行下面的程序,這個(gè)機(jī)制的實(shí)現(xiàn)依賴(lài)于wait/notify。等待機(jī)制與鎖機(jī)制是密切關(guān)聯(lián)的。例如:
synchronized(obj) {
while(!condition) {
obj.wait();
}
obj.doSomething();
}
當(dāng)線(xiàn)程A獲得了obj鎖后,發(fā)現(xiàn)條件condition不滿(mǎn)足,無(wú)法繼續(xù)下一處理,于是線(xiàn)程A就wait()。
在另一線(xiàn)程B中,如果B更改了某些條件,使得線(xiàn)程A的condition條件滿(mǎn)足了,就可以喚醒線(xiàn)程A:
synchronized(obj) {
condition = true;
obj.notify();
}
需要注意的概念是:
# 調(diào)用obj的wait(), notify()方法前,必須獲得obj鎖,也就是必須寫(xiě)在synchronized(obj) {...} 代碼段內(nèi)。
# 調(diào)用obj.wait()后,線(xiàn)程A就釋放了obj的鎖,否則線(xiàn)程B無(wú)法獲得obj鎖,也就無(wú)法在synchronized(obj) {...} 代碼段內(nèi)喚醒A。
# 當(dāng)obj.wait()方法返回后,線(xiàn)程A需要再次獲得obj鎖,才能繼續(xù)執(zhí)行。
# 如果A1,A2,A3都在obj.wait(),則B調(diào)用obj.notify()只能喚醒A1,A2,A3中的一個(gè)(具體哪一個(gè)由JVM決定)。
# obj.notifyAll()則能全部喚醒A1,A2,A3,但是要繼續(xù)執(zhí)行obj.wait()的下一條語(yǔ)句,必須獲得obj鎖,因此,A1,A2,A3只有一個(gè)有機(jī)會(huì)獲得鎖繼續(xù)執(zhí)行,例如A1,其余的需要等待A1釋放obj鎖之后才能繼續(xù)執(zhí)行。
# 當(dāng)B調(diào)用obj.notify/notifyAll的時(shí)候,B正持有obj鎖,因此,A1,A2,A3雖被喚醒,但是仍無(wú)法獲得obj鎖。直到B退出synchronized塊,釋放obj鎖后,A1,A2,A3中的一個(gè)才有機(jī)會(huì)獲得鎖繼續(xù)執(zhí)行。
wait()/sleep()的區(qū)別
前面講了wait/notify機(jī)制,Thread還有一個(gè)sleep()靜態(tài)方法,它也能使線(xiàn)程暫停一段時(shí)間。sleep與wait的不同點(diǎn)是:sleep并不釋放鎖,并且sleep的暫停和wait暫停是不一樣的。obj.wait會(huì)使線(xiàn)程進(jìn)入obj對(duì)象的等待集合中并等待喚醒。
但是wait()和sleep()都可以通過(guò)interrupt()方法打斷線(xiàn)程的暫停狀態(tài),從而使線(xiàn)程立刻拋出InterruptedException。
如果線(xiàn)程A希望立即結(jié)束線(xiàn)程B,則可以對(duì)線(xiàn)程B對(duì)應(yīng)的Thread實(shí)例調(diào)用interrupt方法。如果此刻線(xiàn)程B正在wait/sleep/join,則線(xiàn)程B會(huì)立刻拋出InterruptedException,在catch() {} 中直接return即可安全地結(jié)束線(xiàn)程。
需要注意的是,InterruptedException是線(xiàn)程自己從內(nèi)部拋出的,并不是interrupt()方法拋出的。對(duì)某一線(xiàn)程調(diào)用interrupt()時(shí),如果該線(xiàn)程正在執(zhí)行普通的代碼,那么該線(xiàn)程根本就不會(huì)拋出InterruptedException。但是,一旦該線(xiàn)程進(jìn)入到wait()/sleep()/join()后,就會(huì)立刻拋出InterruptedException。
GuardedSuspention
GuardedSuspention模式主要思想是:
當(dāng)條件不滿(mǎn)足時(shí),線(xiàn)程等待,直到條件滿(mǎn)足時(shí),等待該條件的線(xiàn)程被喚醒。
我們?cè)O(shè)計(jì)一個(gè)客戶(hù)端線(xiàn)程和一個(gè)服務(wù)器線(xiàn)程,客戶(hù)端線(xiàn)程不斷發(fā)送請(qǐng)求給服務(wù)器線(xiàn)程,服務(wù)器線(xiàn)程不斷處理請(qǐng)求。當(dāng)請(qǐng)求隊(duì)列為空時(shí),服務(wù)器線(xiàn)程就必須等待,直到客戶(hù)端發(fā)送了請(qǐng)求。
先定義一個(gè)請(qǐng)求隊(duì)列:Queue
package com.crackj2ee.thread;
import java.util.*;
public class Queue {
private List queue = new LinkedList();
public synchronized Request getRequest() {
while(queue.size()==0) {
try {
this.wait();
}
catch(InterruptedException ie) {
return null;
}
}
return (Request)queue.remove(0);
}
public synchronized void putRequest(Request request) {
queue.add(request);
this.notifyAll();
}
}
藍(lán)色部分就是服務(wù)器線(xiàn)程的等待條件,而客戶(hù)端線(xiàn)程在放入了一個(gè)request后,就使服務(wù)器線(xiàn)程等待條件滿(mǎn)足,于是喚醒服務(wù)器線(xiàn)程。
客戶(hù)端線(xiàn)程:ClientThread
package com.crackj2ee.thread;
public class ClientThread extends Thread {
private Queue queue;
private String clientName;
public ClientThread(Queue queue, String clientName) {
this.queue = queue;
this.clientName = clientName;
}
public String toString() {
return "[ClientThread-" + clientName + "]";
}
public void run() {
for(int i=0; i<100; i++) {
Request request = new Request("" + (long)(Math.random()*10000));
System.out.println(this + " send request: " + request);
queue.putRequest(request);
try {
Thread.sleep((long)(Math.random() * 10000 + 1000));
}
catch(InterruptedException ie) {
}
}
System.out.println(this + " shutdown.");
}
}
服務(wù)器線(xiàn)程:ServerThread
package com.crackj2ee.thread;
public class ServerThread extends Thread {
private boolean stop = false;
private Queue queue;
public ServerThread(Queue queue) {
this.queue = queue;
}
public void shutdown() {
stop = true;
this.interrupt();
try {
this.join();
}
catch(InterruptedException ie) {}
}
public void run() {
while(!stop) {
Request request = queue.getRequest();
System.out.println("[ServerThread] handle request: " + request);
try {
Thread.sleep(2000);
}
catch(InterruptedException ie) {}
}
System.out.println("[ServerThread] shutdown.");
}
}
服務(wù)器線(xiàn)程在紅色部分可能會(huì)阻塞,也就是說(shuō),Queue.getRequest是一個(gè)阻塞方法。這和java標(biāo)準(zhǔn)庫(kù)的許多IO方法類(lèi)似。
最后,寫(xiě)一個(gè)Main來(lái)啟動(dòng)他們:
package com.crackj2ee.thread;
public class Main {
public static void main(String[] args) {
Queue queue = new Queue();
ServerThread server = new ServerThread(queue);
server.start();
ClientThread[] clients = new ClientThread[5];
for(int i=0; i<clients.length; i++) {
clients[i] = new ClientThread(queue, ""+i);
clients[i].start();
}
try {
Thread.sleep(100000);
}
catch(InterruptedException ie) {}
server.shutdown();
}
}
我們啟動(dòng)了5個(gè)客戶(hù)端線(xiàn)程和一個(gè)服務(wù)器線(xiàn)程,運(yùn)行結(jié)果如下:
[ClientThread-0] send request: Request-4984
[ServerThread] handle request: Request-4984
[ClientThread-1] send request: Request-2020
[ClientThread-2] send request: Request-8980
[ClientThread-3] send request: Request-5044
[ClientThread-4] send request: Request-548
[ClientThread-4] send request: Request-6832
[ServerThread] handle request: Request-2020
[ServerThread] handle request: Request-8980
[ServerThread] handle request: Request-5044
[ServerThread] handle request: Request-548
[ClientThread-4] send request: Request-1681
[ClientThread-0] send request: Request-7859
[ClientThread-3] send request: Request-3926
[ServerThread] handle request: Request-6832
[ClientThread-2] send request: Request-9906
......
可以觀察到ServerThread處理來(lái)自不同客戶(hù)端的請(qǐng)求。
思考
Q: 服務(wù)器線(xiàn)程的wait條件while(queue.size()==0)能否換成if(queue.size()==0)?
A: 在這個(gè)例子中可以,因?yàn)榉?wù)器線(xiàn)程只有一個(gè)。但是,如果服務(wù)器線(xiàn)程有多個(gè)(例如Web應(yīng)用程序有多個(gè)線(xiàn)程處理并發(fā)請(qǐng)求,這非常普遍),就會(huì)造成嚴(yán)重問(wèn)題。
Q: 能否用sleep(1000)代替wait()?
A: 絕對(duì)不可以。sleep()不會(huì)釋放鎖,因此sleep期間別的線(xiàn)程根本沒(méi)有辦法調(diào)用getRequest()和putRequest(),導(dǎo)致所有相關(guān)線(xiàn)程都被阻塞。
Q: (Request)queue.remove(0)可以放到synchronized() {}塊外面嗎?
A: 不可以。因?yàn)閣hile()是測(cè)試queue,remove()是使用queue,兩者是一個(gè)原子操作,不能放在synchronized外面。
總結(jié)
多線(xiàn)程設(shè)計(jì)看似簡(jiǎn)單,實(shí)際上必須非常仔細(xì)地考慮各種鎖定/同步的條件,稍不小心,就可能出錯(cuò)。并且,當(dāng)線(xiàn)程較少時(shí),很可能發(fā)現(xiàn)不了問(wèn)題,一旦問(wèn)題出現(xiàn)又難以調(diào)試。
所幸的是,已有一些被驗(yàn)證過(guò)的模式可以供我們使用,我們會(huì)繼續(xù)介紹一些常用的多線(xiàn)程設(shè)計(jì)模式。