[本文是我對Java Concurrency In Practice 7.2的歸納和總結(jié). ?轉(zhuǎn)載請注明作者和出處, ?如有謬誤, 歡迎在評論中指正. ]
以ExecutorService為例, 該類內(nèi)部封裝有多個線程, 類外部無法直接停止這些線程. 相反, 外部調(diào)用Service的shutDown和shutDownNow方法關(guān)閉Service, 而Service負(fù)責(zé)停止其擁有的線程.
大多數(shù)server應(yīng)用會使用到log, 下例中的LogWriter是一個使用生產(chǎn)者消費者模式構(gòu)建的log service, 需要打印log的線程將待打印的內(nèi)容加入到阻塞隊列中, 而logger線程則不斷的從阻塞隊列中取出數(shù)據(jù)輸出:
public class LogWriter { private final BlockingQueue<String> queue; private final LoggerThread logger; public LogWriter(Writer writer) { this.queue = new LinkedBlockingQueue<String>(CAPACITY); this.logger = new LoggerThread(writer); } public void start() { logger.start(); } /** * 需要打印數(shù)據(jù)的線程調(diào)用該方法, 將待打印數(shù)據(jù)加入阻塞隊列 */ public void log(String msg) throws InterruptedException { queue.put(msg); } /** * 負(fù)責(zé)從阻塞隊列中取出數(shù)據(jù)輸出的線程 */ private class LoggerThread extends Thread { private final PrintWriter writer; // ... public void run() { try { while (true) writer.println(queue.take()); } catch (InterruptedException ignored) { } finally { writer.close(); } } } }
LogWriter內(nèi)部封裝有LoggerThread線程, 所以LogWriter是一個基于線程構(gòu)建的Service. 根據(jù)ExecutorService的經(jīng)驗, 我們需要在LogWriter中提供停止LoggerThread線程的方法. 看起來這并不是很難, 我們只需在LogWriter中添加shutDown方法:
/** * 該方法用于停止LoggerThread線程 */ public void shutDown() { logger.interrupt(); }
當(dāng)LogWriter.shutDown方法被調(diào)用時, LoggerThread線程的中斷標(biāo)記被設(shè)置為true, 之后LoggerThread線程執(zhí)行queue.take()方法時會拋出InterruptedException異常, 從而使得LoggerThread線程結(jié)束.
然而這樣的shutDown方法并不是很恰當(dāng):?
1. 丟棄了隊列中尚未來得及輸出的數(shù)據(jù).
2. 更嚴(yán)重的是, 假如線程A對LogWriter.log方法的調(diào)用因為隊列已滿而阻塞, 此時停止LoggerThread線程將導(dǎo)致線程A永遠(yuǎn)阻塞在queue.put方法上.
對上例的改進(jìn):
public class LogWriter { private final BlockingQueue<String> queue; private final LoggerThread loggerThread; private final PrintWriter writer; /** * 表示是否關(guān)閉Service */ private boolean isShutdown; /** * 隊列中待處理數(shù)據(jù)的數(shù)量 */ private int reservations; public void start() { loggerThread.start(); } public void shutDown() { synchronized (this) { isShutdown = true; } loggerThread.interrupt(); } public void log(String msg) throws InterruptedException { synchronized (this) { // service已關(guān)閉后調(diào)用log方法直接拋出異常 if (isShutdown) throw new IllegalStateException("Service has been shut down"); ++reservations; } // BlockingQueue本身就是線程安全的, put方法的調(diào)用不在同步代碼塊中 // 我們只需要保證isShutdown和reservations是線程安全的即可 queue.put(msg); } private class LoggerThread extends Thread { public void run() { try { while (true) { try { synchronized (this) { // 當(dāng)service已關(guān)閉且處理完隊列中的所有數(shù)據(jù)時才跳出while循環(huán) if (isShutdown && reservations == 0) break; } String msg = queue.take(); synchronized (this) { --reservations; } writer.println(msg); } catch (InterruptedException e) { // 發(fā)生InterruptedException異常時不應(yīng)該立刻跳出while循環(huán) // 而應(yīng)該繼續(xù)輸出log, 直到處理完隊列中的所有數(shù)據(jù) } } } finally { writer.close(); } } } }
上面的處理顯得過于復(fù)雜, 利用ExecutorService可以編寫出相對更簡潔的程序:
public class LogService { /** * 創(chuàng)建只包含單個線程的線程池, 提交給該線程池的任務(wù)將以串行的方式逐個運行 * 本例中, 此線程用于執(zhí)行打印log的任務(wù) */ private final ExecutorService exec = Executors.newSingleThreadExecutor(); private final PrintWriter writer; public void start() { } public void shutdown() throws InterruptedException { try { // 關(guān)閉ExecutorService后再調(diào)用其awaitTermination將導(dǎo)致當(dāng)前線程阻塞, 直到所有已提交的任務(wù)執(zhí)行完畢, 或者發(fā)生超時 exec.shutdown(); exec.awaitTermination(TIMEOUT, UNIT); } finally { writer.close(); } } public void log(String msg) { try { // 線程池關(guān)閉后再調(diào)用其execute方法將拋出RejectedExecutionException異常 exec.execute(new WriteTask(msg)); } catch (RejectedExecutionException ignored) { } } private final class WriteTask implements Runnable { private String msg; public WriteTask(String msg) { this.msg = msg; } @Override public void run() { writer.println(msg); } } }