java語(yǔ)言已經(jīng)內(nèi)置了多線程支持,所有實(shí)現(xiàn)Runnable接口的類都可被啟動(dòng)一個(gè)新線程,新線程會(huì)執(zhí)行該實(shí)例的run()方法,當(dāng)run()方法執(zhí)行完畢后,線程就結(jié)束了。一旦一個(gè)線程執(zhí)行完畢,這個(gè)實(shí)例就不能再重新啟動(dòng),只能重新生成一個(gè)新實(shí)例,再啟動(dòng)一個(gè)新線程。
Thread類是實(shí)現(xiàn)了Runnable接口的一個(gè)實(shí)例,它代表一個(gè)線程的實(shí)例,并且,啟動(dòng)線程的唯一方法就是通過(guò)Thread類的start()實(shí)例方法:
Thread t = new Thread(); t.start();
start()方法是一個(gè)native方法,它將啟動(dòng)一個(gè)新線程,并執(zhí)行run()方法。Thread類默認(rèn)的run()方法什么也不做就退出了。注意:直接調(diào)用run()方法并不會(huì)啟動(dòng)一個(gè)新線程,它和調(diào)用一個(gè)普通的java方法沒有什么區(qū)別。
因此,有兩個(gè)方法可以實(shí)現(xiàn)自己的線程:
方法1:自己的類extend Thread,并復(fù)寫run()方法,就可以啟動(dòng)新線程并執(zhí)行自己定義的run()方法。例如:
public class MyThread extends Thread { public run() { System.out.println("MyThread.run()"); } }
在合適的地方啟動(dòng)線程:new MyThread().start();
方法2:如果自己的類已經(jīng)extends另一個(gè)類,就無(wú)法直接extends Thread,此時(shí),必須實(shí)現(xiàn)一個(gè)Runnable接口:
public class MyThread extends OtherClass implements Runnable { public run() { System.out.println("MyThread.run()"); } }
為了啟動(dòng)MyThread,需要首先實(shí)例化一個(gè)Thread,并傳入自己的MyThread實(shí)例:
MyThread myt = new MyThread(); Thread t = new Thread(myt); t.start();
事實(shí)上,當(dāng)傳入一個(gè)Runnable target參數(shù)給Thread后,Thread的run()方法就會(huì)調(diào)用target.run(),參考JDK源代碼:
public void run() { if (target != null) { target.run(); } }
線程還有一些Name, ThreadGroup, isDaemon等設(shè)置,由于和線程設(shè)計(jì)模式關(guān)聯(lián)很少,這里就不多說(shuō)了。
由于同一進(jìn)程內(nèi)的多個(gè)線程共享內(nèi)存空間,在Java中,就是共享實(shí)例,當(dāng)多個(gè)線程試圖同時(shí)修改某個(gè)實(shí)例的內(nèi)容時(shí),就會(huì)造成沖突,因此,線程必須實(shí)現(xiàn)共享互斥,使多線程同步。
最簡(jiǎn)單的同步是將一個(gè)方法標(biāo)記為synchronized,對(duì)同一個(gè)實(shí)例來(lái)說(shuō),任一時(shí)刻只能有一個(gè)synchronized方法在執(zhí)行。當(dāng)一個(gè)方法正在執(zhí)行某個(gè)synchronized方法時(shí),其他線程如果想要執(zhí)行這個(gè)實(shí)例的任意一個(gè)synchronized方法,都必須等待當(dāng)前執(zhí)行 synchronized方法的線程退出此方法后,才能依次執(zhí)行。
但是,非synchronized方法不受影響,不管當(dāng)前有沒有執(zhí)行synchronized方法,非synchronized方法都可以被多個(gè)線程同時(shí)執(zhí)行。
此外,必須注意,只有同一實(shí)例的synchronized方法同一時(shí)間只能被一個(gè)線程執(zhí)行,不同實(shí)例的synchronized方法是可以并發(fā)的。例如,class A定義了synchronized方法sync(),則不同實(shí)例a1.sync()和a2.sync()可以同時(shí)由兩個(gè)線程來(lái)執(zhí)行。
多線程同步的實(shí)現(xiàn)最終依賴鎖機(jī)制。我們可以想象某一共享資源是一間屋子,每個(gè)人都是一個(gè)線程。當(dāng)A希望進(jìn)入房間時(shí),他必須獲得門鎖,一旦A獲得門鎖,他進(jìn)去后就立刻將門鎖上,于是B,C,D...就不得不在門外等待,直到A釋放鎖出來(lái)后,B,C,D...中的某一人搶到了該鎖(具體搶法依賴于 JVM的實(shí)現(xiàn),可以先到先得,也可以隨機(jī)挑選),然后進(jìn)屋又將門鎖上。這樣,任一時(shí)刻最多有一人在屋內(nèi)(使用共享資源)。
Java語(yǔ)言規(guī)范內(nèi)置了對(duì)多線程的支持。對(duì)于Java程序來(lái)說(shuō),每一個(gè)對(duì)象實(shí)例都有一把“鎖”,一旦某個(gè)線程獲得了該鎖,別的線程如果希望獲得該鎖,只能等待這個(gè)線程釋放鎖之后。獲得鎖的方法只有一個(gè),就是synchronized關(guān)鍵字。例如:
public class SharedResource { private int count = 0;
public int getCount() { return count; }
public synchronized void setCount(int count) { this.count = count; }
}
同步方法public synchronized void setCount(int count) { this.count = count; } 事實(shí)上相當(dāng)于:
public void setCount(int count) { synchronized(this) { // 在此獲得this鎖 this.count = count; } // 在此釋放this鎖 }
紅色部分表示需要同步的代碼段,該區(qū)域?yàn)?#8220;危險(xiǎn)區(qū)域”,如果兩個(gè)以上的線程同時(shí)執(zhí)行,會(huì)引發(fā)沖突,因此,要更改SharedResource的內(nèi)部狀態(tài),必須先獲得SharedResource實(shí)例的鎖。
退出synchronized塊時(shí),線程擁有的鎖自動(dòng)釋放,于是,別的線程又可以獲取該鎖了。
為了提高性能,不一定要鎖定this,例如,SharedResource有兩個(gè)獨(dú)立變化的變量:
public class SharedResouce { private int a = 0; private int b = 0;
public synchronized void setA(int a) { this.a = a; }
public synchronized void setB(int b) { this.b = b; } }
若同步整個(gè)方法,則setA()的時(shí)候無(wú)法setB(),setB()時(shí)無(wú)法setA()。為了提高性能,可以使用不同對(duì)象的鎖:
public class SharedResouce { private int a = 0; private int b = 0; private Object sync_a = new Object(); private Object sync_b = new Object();
public void setA(int a) { synchronized(sync_a) { this.a = a; } }
public synchronized void setB(int b) { synchronized(sync_b) { this.b = b; } } }
通常,多線程之間需要協(xié)調(diào)工作。例如,瀏覽器的一個(gè)顯示圖片的線程displayThread想要執(zhí)行顯示圖片的任務(wù),必須等待下載線程 downloadThread將該圖片下載完畢。如果圖片還沒有下載完,displayThread可以暫停,當(dāng)downloadThread完成了任務(wù)后,再通知displayThread“圖片準(zhǔn)備完畢,可以顯示了”,這時(shí),displayThread繼續(xù)執(zhí)行。
以上邏輯簡(jiǎn)單的說(shuō)就是:如果條件不滿足,則等待。當(dāng)條件滿足時(shí),等待該條件的線程將被喚醒。在Java中,這個(gè)機(jī)制的實(shí)現(xiàn)依賴于wait/notify。等待機(jī)制與鎖機(jī)制是密切關(guān)聯(lián)的。例如:
synchronized(obj) { while(!condition) { obj.wait(); } obj.doSomething(); }
當(dāng)線程A獲得了obj鎖后,發(fā)現(xiàn)條件condition不滿足,無(wú)法繼續(xù)下一處理,于是線程A就wait()。
在另一線程B中,如果B更改了某些條件,使得線程A的condition條件滿足了,就可以喚醒線程A:
synchronized(obj) { condition = true; obj.notify(); }
需要注意的概念是:
# 調(diào)用obj的wait(), notify()方法前,必須獲得obj鎖,也就是必須寫在synchronized(obj) {...} 代碼段內(nèi)。
# 調(diào)用obj.wait()后,線程A就釋放了obj的鎖,否則線程B無(wú)法獲得obj鎖,也就無(wú)法在synchronized(obj) {...} 代碼段內(nèi)喚醒A。
# 當(dāng)obj.wait()方法返回后,線程A需要再次獲得obj鎖,才能繼續(xù)執(zhí)行。
# 如果A1,A2,A3都在obj.wait(),則B調(diào)用obj.notify()只能喚醒A1,A2,A3中的一個(gè)(具體哪一個(gè)由JVM決定)。
# obj.notifyAll()則能全部喚醒A1,A2,A3,但是要繼續(xù)執(zhí)行obj.wait()的下一條語(yǔ)句,必須獲得obj鎖,因此,A1,A2,A3只有一個(gè)有機(jī)會(huì)獲得鎖繼續(xù)執(zhí)行,例如A1,其余的需要等待A1釋放obj鎖之后才能繼續(xù)執(zhí)行。
# 當(dāng)B調(diào)用obj.notify/notifyAll的時(shí)候,B正持有obj鎖,因此,A1,A2,A3雖被喚醒,但是仍無(wú)法獲得obj鎖。直到B退出synchronized塊,釋放obj鎖后,A1,A2,A3中的一個(gè)才有機(jī)會(huì)獲得鎖繼續(xù)執(zhí)行。
前面講了wait/notify機(jī)制,Thread還有一個(gè)sleep()靜態(tài)方法,它也能使線程暫停一段時(shí)間。sleep與wait的不同點(diǎn)是: sleep并不釋放鎖,并且sleep的暫停和wait暫停是不一樣的。obj.wait會(huì)使線程進(jìn)入obj對(duì)象的等待集合中并等待喚醒。
但是wait()和sleep()都可以通過(guò)interrupt()方法打斷線程的暫停狀態(tài),從而使線程立刻拋出InterruptedException。
如果線程A希望立即結(jié)束線程B,則可以對(duì)線程B對(duì)應(yīng)的Thread實(shí)例調(diào)用interrupt方法。如果此刻線程B正在 wait/sleep/join,則線程B會(huì)立刻拋出InterruptedException,在catch() {} 中直接return即可安全地結(jié)束線程。
需要注意的是,InterruptedException是線程自己從內(nèi)部拋出的,并不是interrupt()方法拋出的。對(duì)某一線程調(diào)用 interrupt()時(shí),如果該線程正在執(zhí)行普通的代碼,那么該線程根本就不會(huì)拋出InterruptedException。但是,一旦該線程進(jìn)入到 wait()/sleep()/join()后,就會(huì)立刻拋出InterruptedException。
GuardedSuspention模式主要思想是:
當(dāng)條件不滿足時(shí),線程等待,直到條件滿足時(shí),等待該條件的線程被喚醒。
我們?cè)O(shè)計(jì)一個(gè)客戶端線程和一個(gè)服務(wù)器線程,客戶端線程不斷發(fā)送請(qǐng)求給服務(wù)器線程,服務(wù)器線程不斷處理請(qǐng)求。當(dāng)請(qǐng)求隊(duì)列為空時(shí),服務(wù)器線程就必須等待,直到客戶端發(fā)送了請(qǐng)求。
先定義一個(gè)請(qǐng)求隊(duì)列:Queue
package com.crackj2ee.thread;
import java.util.*;
public class Queue { private List queue = new LinkedList();
public synchronized Request getRequest() { while(queue.size()==0) { try { this.wait(); } catch(InterruptedException ie) { return null; } } return (Request)queue.remove(0); }
public synchronized void putRequest(Request request) { queue.add(request); this.notifyAll(); }
}
藍(lán)色部分就是服務(wù)器線程的等待條件,而客戶端線程在放入了一個(gè)request后,就使服務(wù)器線程等待條件滿足,于是喚醒服務(wù)器線程。
客戶端線程:ClientThread
package com.crackj2ee.thread;
public class ClientThread extends Thread { private Queue queue; private String clientName;
public ClientThread(Queue queue, String clientName) { this.queue = queue; this.clientName = clientName; }
public String toString() { return "[ClientThread-" + clientName + "]"; }
public void run() { for(int i=0; i<100; i++) { Request request = new Request("" + (long)(Math.random()*10000)); System.out.println(this + " send request: " + request); queue.putRequest(request); try { Thread.sleep((long)(Math.random() * 10000 + 1000)); } catch(InterruptedException ie) { } } System.out.println(this + " shutdown."); } }
服務(wù)器線程:ServerThread
package com.crackj2ee.thread; public class ServerThread extends Thread { private boolean stop = false; private Queue queue;
public ServerThread(Queue queue) { this.queue = queue; }
public void shutdown() { stop = true; this.interrupt(); try { this.join(); } catch(InterruptedException ie) {} }
public void run() { while(!stop) { Request request = queue.getRequest(); System.out.println("[ServerThread] handle request: " + request); try { Thread.sleep(2000); } catch(InterruptedException ie) {} } System.out.println("[ServerThread] shutdown."); } }
服務(wù)器線程在紅色部分可能會(huì)阻塞,也就是說(shuō),Queue.getRequest是一個(gè)阻塞方法。這和java標(biāo)準(zhǔn)庫(kù)的許多IO方法類似。
最后,寫一個(gè)Main來(lái)啟動(dòng)他們:
package com.crackj2ee.thread;
public class Main {
public static void main(String[] args) { Queue queue = new Queue(); ServerThread server = new ServerThread(queue); server.start(); ClientThread[] clients = new ClientThread[5]; for(int i=0; i<clients.length; i++) { clients[i] = new ClientThread(queue, ""+i); clients[i].start(); } try { Thread.sleep(100000); } catch(InterruptedException ie) {} server.shutdown(); } }
我們啟動(dòng)了5個(gè)客戶端線程和一個(gè)服務(wù)器線程,運(yùn)行結(jié)果如下:
[ClientThread-0] send request: Request-4984 [ServerThread] handle request: Request-4984 [ClientThread-1] send request: Request-2020 [ClientThread-2] send request: Request-8980 [ClientThread-3] send request: Request-5044 [ClientThread-4] send request: Request-548 [ClientThread-4] send request: Request-6832 [ServerThread] handle request: Request-2020 [ServerThread] handle request: Request-8980 [ServerThread] handle request: Request-5044 [ServerThread] handle request: Request-548 [ClientThread-4] send request: Request-1681 [ClientThread-0] send request: Request-7859 [ClientThread-3] send request: Request-3926 [ServerThread] handle request: Request-6832 [ClientThread-2] send request: Request-9906 ......
可以觀察到ServerThread處理來(lái)自不同客戶端的請(qǐng)求。
思考
Q: 服務(wù)器線程的wait條件while(queue.size()==0)能否換成if(queue.size()==0)?
A: 在這個(gè)例子中可以,因?yàn)榉?wù)器線程只有一個(gè)。但是,如果服務(wù)器線程有多個(gè)(例如Web應(yīng)用程序有多個(gè)線程處理并發(fā)請(qǐng)求,這非常普遍),就會(huì)造成嚴(yán)重問(wèn)題。
Q: 能否用sleep(1000)代替wait()?
A: 絕對(duì)不可以。sleep()不會(huì)釋放鎖,因此sleep期間別的線程根本沒有辦法調(diào)用getRequest()和putRequest(),導(dǎo)致所有相關(guān)線程都被阻塞。
Q: (Request)queue.remove(0)可以放到synchronized() {}塊外面嗎?
A: 不可以。因?yàn)閣hile()是測(cè)試queue,remove()是使用queue,兩者是一個(gè)原子操作,不能放在synchronized外面。
總結(jié)
多線程設(shè)計(jì)看似簡(jiǎn)單,實(shí)際上必須非常仔細(xì)地考慮各種鎖定/同步的條件,稍不小心,就可能出錯(cuò)。并且,當(dāng)線程較少時(shí),很可能發(fā)現(xiàn)不了問(wèn)題,一旦問(wèn)題出現(xiàn)又難以調(diào)試。
所幸的是,已有一些被驗(yàn)證過(guò)的模式可以供我們使用,我們會(huì)繼續(xù)介紹一些常用的多線程設(shè)計(jì)模式。
前面談了多線程應(yīng)用程序能極大地改善用戶相應(yīng)。例如對(duì)于一個(gè)Web應(yīng)用程序,每當(dāng)一個(gè)用戶請(qǐng)求服務(wù)器連接時(shí),服務(wù)器就可以啟動(dòng)一個(gè)新線程為用戶服務(wù)。
然而,創(chuàng)建和銷毀線程本身就有一定的開銷,如果頻繁創(chuàng)建和銷毀線程,CPU和內(nèi)存開銷就不可忽略,垃圾收集器還必須負(fù)擔(dān)更多的工作。因此,線程池就是為了避免頻繁創(chuàng)建和銷毀線程。
每當(dāng)服務(wù)器接受了一個(gè)新的請(qǐng)求后,服務(wù)器就從線程池中挑選一個(gè)等待的線程并執(zhí)行請(qǐng)求處理。處理完畢后,線程并不結(jié)束,而是轉(zhuǎn)為阻塞狀態(tài)再次被放入線程池中。這樣就避免了頻繁創(chuàng)建和銷毀線程。
Worker Pattern實(shí)現(xiàn)了類似線程池的功能。首先定義Task接口:
package com.crackj2ee.thread; public interface Task { void execute(); }
線程將負(fù)責(zé)執(zhí)行execute()方法。注意到任務(wù)是由子類通過(guò)實(shí)現(xiàn)execute()方法實(shí)現(xiàn)的,線程本身并不知道自己執(zhí)行的任務(wù)。它只負(fù)責(zé)運(yùn)行一個(gè)耗時(shí)的execute()方法。
具體任務(wù)由子類實(shí)現(xiàn),我們定義了一個(gè)CalculateTask和一個(gè)TimerTask:
// CalculateTask.java package com.crackj2ee.thread; public class CalculateTask implements Task { private static int count = 0; private int num = count; public CalculateTask() { count++; } public void execute() { System.out.println("[CalculateTask " + num + "] start..."); try { Thread.sleep(3000); } catch(InterruptedException ie) {} System.out.println("[CalculateTask " + num + "] done."); } }
// TimerTask.java package com.crackj2ee.thread; public class TimerTask implements Task { private static int count = 0; private int num = count; public TimerTask() { count++; } public void execute() { System.out.println("[TimerTask " + num + "] start..."); try { Thread.sleep(2000); } catch(InterruptedException ie) {} System.out.println("[TimerTask " + num + "] done."); } }
以上任務(wù)均簡(jiǎn)單的sleep若干秒。
TaskQueue實(shí)現(xiàn)了一個(gè)隊(duì)列,客戶端可以將請(qǐng)求放入隊(duì)列,服務(wù)器線程可以從隊(duì)列中取出任務(wù):
package com.crackj2ee.thread; import java.util.*; public class TaskQueue { private List queue = new LinkedList(); public synchronized Task getTask() { while(queue.size()==0) { try { this.wait(); } catch(InterruptedException ie) { return null; } } return (Task)queue.remove(0); } public synchronized void putTask(Task task) { queue.add(task); this.notifyAll(); } }
終于到了真正的WorkerThread,這是真正執(zhí)行任務(wù)的服務(wù)器線程:
package com.crackj2ee.thread; public class WorkerThread extends Thread { private static int count = 0; private boolean busy = false; private boolean stop = false; private TaskQueue queue; public WorkerThread(ThreadGroup group, TaskQueue queue) { super(group, "worker-" + count); count++; this.queue = queue; } public void shutdown() { stop = true; this.interrupt(); try { this.join(); } catch(InterruptedException ie) {} } public boolean isIdle() { return !busy; } public void run() { System.out.println(getName() + " start."); while(!stop) { Task task = queue.getTask(); if(task!=null) { busy = true; task.execute(); busy = false; } } System.out.println(getName() + " end."); } }
前面已經(jīng)講過(guò),queue.getTask()是一個(gè)阻塞方法,服務(wù)器線程可能在此wait()一段時(shí)間。此外,WorkerThread還有一個(gè)shutdown方法,用于安全結(jié)束線程。
最后是ThreadPool,負(fù)責(zé)管理所有的服務(wù)器線程,還可以動(dòng)態(tài)增加和減少線程數(shù):
package com.crackj2ee.thread; import java.util.*; public class ThreadPool extends ThreadGroup { private List threads = new LinkedList(); private TaskQueue queue; public ThreadPool(TaskQueue queue) { super("Thread-Pool"); this.queue = queue; } public synchronized void addWorkerThread() { Thread t = new WorkerThread(this, queue); threads.add(t); t.start(); } public synchronized void removeWorkerThread() { if(threads.size()>0) { WorkerThread t = (WorkerThread)threads.remove(0); t.shutdown(); } } public synchronized void currentStatus() { System.out.println("-----------------------------------------------"); System.out.println("Thread count = " + threads.size()); Iterator it = threads.iterator(); while(it.hasNext()) { WorkerThread t = (WorkerThread)it.next(); System.out.println(t.getName() + ": " + (t.isIdle() ? "idle" : "busy")); } System.out.println("-----------------------------------------------"); } }
currentStatus()方法是為了方便調(diào)試,打印出所有線程的當(dāng)前狀態(tài)。
最后,Main負(fù)責(zé)完成main()方法:
package com.crackj2ee.thread; public class Main { public static void main(String[] args) { TaskQueue queue = new TaskQueue(); ThreadPool pool = new ThreadPool(queue); for(int i=0; i<10; i++) { queue.putTask(new CalculateTask()); queue.putTask(new TimerTask()); } pool.addWorkerThread(); pool.addWorkerThread(); doSleep(8000); pool.currentStatus(); pool.addWorkerThread(); pool.addWorkerThread(); pool.addWorkerThread(); pool.addWorkerThread(); pool.addWorkerThread(); doSleep(5000); pool.currentStatus(); } private static void doSleep(long ms) { try { Thread.sleep(ms); } catch(InterruptedException ie) {} } }
main()一開始放入了20個(gè)Task,然后動(dòng)態(tài)添加了一些服務(wù)線程,并定期打印線程狀態(tài),運(yùn)行結(jié)果如下:
worker-0 start. [CalculateTask 0] start... worker-1 start. [TimerTask 0] start... [TimerTask 0] done. [CalculateTask 1] start... [CalculateTask 0] done. [TimerTask 1] start... [CalculateTask 1] done. [CalculateTask 2] start... [TimerTask 1] done. [TimerTask 2] start... [TimerTask 2] done. [CalculateTask 3] start... ----------------------------------------------- Thread count = 2 worker-0: busy worker-1: busy ----------------------------------------------- [CalculateTask 2] done. [TimerTask 3] start... worker-2 start. [CalculateTask 4] start... worker-3 start. [TimerTask 4] start... worker-4 start. [CalculateTask 5] start... worker-5 start. [TimerTask 5] start... worker-6 start. [CalculateTask 6] start... [CalculateTask 3] done. [TimerTask 6] start... [TimerTask 3] done. [CalculateTask 7] start... [TimerTask 4] done. [TimerTask 7] start... [TimerTask 5] done. [CalculateTask 8] start... [CalculateTask 4] done. [TimerTask 8] start... [CalculateTask 5] done. [CalculateTask 9] start... [CalculateTask 6] done. [TimerTask 9] start... [TimerTask 6] done. [TimerTask 7] done. ----------------------------------------------- Thread count = 7 worker-0: idle worker-1: busy worker-2: busy worker-3: idle worker-4: busy worker-5: busy worker-6: busy ----------------------------------------------- [CalculateTask 7] done. [CalculateTask 8] done. [TimerTask 8] done. [TimerTask 9] done. [CalculateTask 9] done.
仔細(xì)觀察:一開始只有兩個(gè)服務(wù)器線程,因此線程狀態(tài)都是忙,后來(lái)線程數(shù)增多,6個(gè)線程中的兩個(gè)狀態(tài)變成idle,說(shuō)明處于wait()狀態(tài)。
思考:本例的線程調(diào)度算法其實(shí)根本沒有,因?yàn)檫@個(gè)應(yīng)用是圍繞TaskQueue設(shè)計(jì)的,不是以Thread Pool為中心設(shè)計(jì)的。因此,Task調(diào)度取決于TaskQueue的getTask()方法,你可以改進(jìn)這個(gè)方法,例如使用優(yōu)先隊(duì)列,使優(yōu)先級(jí)高的任務(wù)先被執(zhí)行。
如果所有的服務(wù)器線程都處于busy狀態(tài),則說(shuō)明任務(wù)繁忙,TaskQueue的隊(duì)列越來(lái)越長(zhǎng),最終會(huì)導(dǎo)致服務(wù)器內(nèi)存耗盡。因此,可以限制 TaskQueue的等待任務(wù)數(shù),超過(guò)最大長(zhǎng)度就拒絕處理。許多Web服務(wù)器在用戶請(qǐng)求繁忙時(shí)就會(huì)拒絕用戶:HTTP 503 SERVICE UNAVAILABLE
多線程讀寫同一個(gè)對(duì)象的數(shù)據(jù)是很普遍的,通常,要避免讀寫沖突,必須保證任何時(shí)候僅有一個(gè)線程在寫入,有線程正在讀取的時(shí)候,寫入操作就必須等待。簡(jiǎn)單說(shuō),就是要避免“寫-寫”沖突和“讀-寫”沖突。但是同時(shí)讀是允許的,因?yàn)?#8220;讀-讀”不沖突,而且很安全。
要實(shí)現(xiàn)以上的ReadWriteLock,簡(jiǎn)單的使用synchronized就不行,我們必須自己設(shè)計(jì)一個(gè)ReadWriteLock類,在讀之前,必須先獲得“讀鎖”,寫之前,必須先獲得“寫鎖”。舉例說(shuō)明:
DataHandler對(duì)象保存了一個(gè)可讀寫的char[]數(shù)組:
package com.crackj2ee.thread;
public class DataHandler { // store data: private char[] buffer = "AAAAAAAAAA".toCharArray();
private char[] doRead() { char[] ret = new char[buffer.length]; for(int i=0; i<buffer.length; i++) { ret[i] = buffer[i]; sleep(3); } return ret; }
private void doWrite(char[] data) { if(data!=null) { buffer = new char[data.length]; for(int i=0; i<buffer.length; i++) { buffer[i] = data[i]; sleep(10); } } }
private void sleep(int ms) { try { Thread.sleep(ms); } catch(InterruptedException ie) {} } }
doRead()和doWrite()方法是非線程安全的讀寫方法。為了演示,加入了sleep(),并設(shè)置讀的速度大約是寫的3倍,這符合通常的情況。
為了讓多線程能安全讀寫,我們?cè)O(shè)計(jì)了一個(gè)ReadWriteLock:
package com.crackj2ee.thread; public class ReadWriteLock { private int readingThreads = 0; private int writingThreads = 0; private int waitingThreads = 0; // waiting for write private boolean preferWrite = true;
public synchronized void readLock() throws InterruptedException { while(writingThreads>0 || (preferWrite && waitingThreads>0)) this.wait(); readingThreads++; }
public synchronized void readUnlock() { readingThreads--; preferWrite = true; notifyAll(); }
public synchronized void writeLock() throws InterruptedException { waitingThreads++; try { while(readingThreads>0 || writingThreads>0) this.wait(); } finally { waitingThreads--; } writingThreads++; }
public synchronized void writeUnlock() { writingThreads--; preferWrite = false; notifyAll(); } }
readLock()用于獲得讀鎖,readUnlock()釋放讀鎖,writeLock()和writeUnlock()一樣。由于鎖用完必須釋放,因此,必須保證lock和unlock匹配。我們修改DataHandler,加入ReadWriteLock:
package com.crackj2ee.thread; public class DataHandler { // store data: private char[] buffer = "AAAAAAAAAA".toCharArray(); // lock: private ReadWriteLock lock = new ReadWriteLock();
public char[] read(String name) throws InterruptedException { System.out.println(name + " waiting for read..."); lock.readLock(); try { char[] data = doRead(); System.out.println(name + " reads data: " + new String(data)); return data; } finally { lock.readUnlock(); } }
public void write(String name, char[] data) throws InterruptedException { System.out.println(name + " waiting for write..."); lock.writeLock(); try { System.out.println(name + " wrote data: " + new String(data)); doWrite(data); } finally { lock.writeUnlock(); } }
private char[] doRead() { char[] ret = new char[buffer.length]; for(int i=0; i<buffer.length; i++) { ret[i] = buffer[i]; sleep(3); } return ret; } private void doWrite(char[] data) { if(data!=null) { buffer = new char[data.length]; for(int i=0; i<buffer.length; i++) { buffer[i] = data[i]; sleep(10); } } } private void sleep(int ms) { try { Thread.sleep(ms); } catch(InterruptedException ie) {} } }
public方法read()和write()完全封裝了底層的ReadWriteLock,因此,多線程可以安全地調(diào)用這兩個(gè)方法:
// ReadingThread不斷讀取數(shù)據(jù): package com.crackj2ee.thread; public class ReadingThread extends Thread { private DataHandler handler; public ReadingThread(DataHandler handler) { this.handler = handler; } public void run() { for(;;) { try { char[] data = handler.read(getName()); Thread.sleep((long)(Math.random()*1000+100)); } catch(InterruptedException ie) { break; } } } }
// WritingThread不斷寫入數(shù)據(jù),每次寫入的都是10個(gè)相同的字符: package com.crackj2ee.thread; public class WritingThread extends Thread { private DataHandler handler; public WritingThread(DataHandler handler) { this.handler = handler; } public void run() { char[] data = new char[10]; for(;;) { try { fill(data); handler.write(getName(), data); Thread.sleep((long)(Math.random()*1000+100)); } catch(InterruptedException ie) { break; } } } // 產(chǎn)生一個(gè)A-Z隨機(jī)字符,填入char[10]: private void fill(char[] data) { char c = (char)(Math.random()*26+'A'); for(int i=0; i<data.length; i++) data[i] = c; } }
最后Main負(fù)責(zé)啟動(dòng)這些線程:
package com.crackj2ee.thread; public class Main { public static void main(String[] args) { DataHandler handler = new DataHandler(); Thread[] ts = new Thread[] { new ReadingThread(handler), new ReadingThread(handler), new ReadingThread(handler), new ReadingThread(handler), new ReadingThread(handler), new WritingThread(handler), new WritingThread(handler) }; for(int i=0; i<ts.length; i++) { ts[i].start(); } } }
我們啟動(dòng)了5個(gè)讀線程和2個(gè)寫線程,運(yùn)行結(jié)果如下:
Thread-0 waiting for read... Thread-1 waiting for read... Thread-2 waiting for read... Thread-3 waiting for read... Thread-4 waiting for read... Thread-5 waiting for write... Thread-6 waiting for write... Thread-4 reads data: AAAAAAAAAA Thread-3 reads data: AAAAAAAAAA Thread-2 reads data: AAAAAAAAAA Thread-1 reads data: AAAAAAAAAA Thread-0 reads data: AAAAAAAAAA Thread-5 wrote data: EEEEEEEEEE Thread-6 wrote data: MMMMMMMMMM Thread-1 waiting for read... Thread-4 waiting for read... Thread-1 reads data: MMMMMMMMMM Thread-4 reads data: MMMMMMMMMM Thread-2 waiting for read... Thread-2 reads data: MMMMMMMMMM Thread-0 waiting for read... Thread-0 reads data: MMMMMMMMMM Thread-4 waiting for read... Thread-4 reads data: MMMMMMMMMM Thread-2 waiting for read... Thread-5 waiting for write... Thread-2 reads data: MMMMMMMMMM Thread-5 wrote data: GGGGGGGGGG Thread-6 waiting for write... Thread-6 wrote data: AAAAAAAAAA Thread-3 waiting for read... Thread-3 reads data: AAAAAAAAAA ......
可以看到,每次讀/寫都是完整的原子操作,因?yàn)槲覀兠看螌懭氲亩际?0個(gè)相同字符。并且,每次讀出的都是最近一次寫入的內(nèi)容。
如果去掉ReadWriteLock:
package com.crackj2ee.thread; public class DataHandler {
// store data: private char[] buffer = "AAAAAAAAAA".toCharArray();
public char[] read(String name) throws InterruptedException { char[] data = doRead(); System.out.println(name + " reads data: " + new String(data)); return data; } public void write(String name, char[] data) throws InterruptedException { System.out.println(name + " wrote data: " + new String(data)); doWrite(data); }
private char[] doRead() { char[] ret = new char[10]; for(int i=0; i<10; i++) { ret[i] = buffer[i]; sleep(3); } return ret; } private void doWrite(char[] data) { for(int i=0; i<10; i++) { buffer[i] = data[i]; sleep(10); } } private void sleep(int ms) { try { Thread.sleep(ms); } catch(InterruptedException ie) {} } }
運(yùn)行結(jié)果如下:
Thread-5 wrote data: AAAAAAAAAA Thread-6 wrote data: MMMMMMMMMM Thread-0 reads data: AAAAAAAAAA Thread-1 reads data: AAAAAAAAAA Thread-2 reads data: AAAAAAAAAA Thread-3 reads data: AAAAAAAAAA Thread-4 reads data: AAAAAAAAAA Thread-2 reads data: MAAAAAAAAA Thread-3 reads data: MAAAAAAAAA Thread-5 wrote data: CCCCCCCCCC Thread-1 reads data: MAAAAAAAAA Thread-0 reads data: MAAAAAAAAA Thread-4 reads data: MAAAAAAAAA Thread-6 wrote data: EEEEEEEEEE Thread-3 reads data: EEEEECCCCC Thread-4 reads data: EEEEEEEEEC Thread-1 reads data: EEEEEEEEEE
可以看到在Thread-6寫入EEEEEEEEEE的過(guò)程中,3個(gè)線程讀取的內(nèi)容是不同的。
思考
java的synchronized提供了最底層的物理鎖,要在synchronized的基礎(chǔ)上,實(shí)現(xiàn)自己的邏輯鎖,就必須仔細(xì)設(shè)計(jì)ReadWriteLock。
Q: lock.readLock()為什么不放入try{ } 內(nèi)? A: 因?yàn)閞eadLock()會(huì)拋出InterruptedException,導(dǎo)致readingThreads++不執(zhí)行,而readUnlock()在 finally{ } 中,導(dǎo)致readingThreads--執(zhí)行,從而使readingThread狀態(tài)出錯(cuò)。writeLock()也是類似的。
Q: preferWrite有用嗎? A: 如果去掉preferWrite,線程安全不受影響。但是,如果讀取線程很多,上一個(gè)線程還沒有讀取完,下一個(gè)線程又開始讀了,就導(dǎo)致寫入線程長(zhǎng)時(shí)間無(wú)法獲得writeLock;如果寫入線程等待的很多,一個(gè)接一個(gè)寫,也會(huì)導(dǎo)致讀取線程長(zhǎng)時(shí)間無(wú)法獲得readLock。preferWrite的作用是讓讀 /寫交替執(zhí)行,避免由于讀線程繁忙導(dǎo)致寫無(wú)法進(jìn)行和由于寫線程繁忙導(dǎo)致讀無(wú)法進(jìn)行。
Q: notifyAll()換成notify()行不行? A: 不可以。由于preferWrite的存在,如果一個(gè)線程剛讀取完畢,此時(shí)preferWrite=true,再notify(),若恰好喚醒的是一個(gè)讀線程,則while(writingThreads>0 || (preferWrite && waitingThreads>0))可能為true導(dǎo)致該讀線程繼續(xù)等待,而等待寫入的線程也處于wait()中,結(jié)果所有線程都處于wait ()狀態(tài),誰(shuí)也無(wú)法喚醒誰(shuí)。因此,notifyAll()比notify()要來(lái)得安全。程序驗(yàn)證notify()帶來(lái)的死鎖:
Thread-0 waiting for read... Thread-1 waiting for read... Thread-2 waiting for read... Thread-3 waiting for read... Thread-4 waiting for read... Thread-5 waiting for write... Thread-6 waiting for write... Thread-0 reads data: AAAAAAAAAA Thread-4 reads data: AAAAAAAAAA Thread-3 reads data: AAAAAAAAAA Thread-2 reads data: AAAAAAAAAA Thread-1 reads data: AAAAAAAAAA Thread-5 wrote data: CCCCCCCCCC Thread-2 waiting for read... Thread-1 waiting for read... Thread-3 waiting for read... Thread-0 waiting for read... Thread-4 waiting for read... Thread-6 wrote data: LLLLLLLLLL Thread-5 waiting for write... Thread-6 waiting for write... Thread-2 reads data: LLLLLLLLLL Thread-2 waiting for read... (運(yùn)行到此不動(dòng)了)
注意到這種死鎖是由于所有線程都在等待別的線程喚醒自己,結(jié)果都無(wú)法醒過(guò)來(lái)。這和兩個(gè)線程希望獲得對(duì)方已有的鎖造成死鎖不同。因此多線程設(shè)計(jì)的難度遠(yuǎn)遠(yuǎn)高于單線程應(yīng)用。
|