[本文是我對Java Concurrency In Practice 7.2的歸納和總結. ?轉載請注明作者和出處, ?如有謬誤, 歡迎在評論中指正. ]
以ExecutorService為例, 該類內部封裝有多個線程, 類外部無法直接停止這些線程. 相反, 外部調用Service的shutDown和shutDownNow方法關閉Service, 而Service負責停止其擁有的線程.
大多數server應用會使用到log, 下例中的LogWriter是一個使用生產者消費者模式構建的log service, 需要打印log的線程將待打印的內容加入到阻塞隊列中, 而logger線程則不斷的從阻塞隊列中取出數據輸出:
public class LogWriter { private final BlockingQueue<String> queue; private final LoggerThread logger; public LogWriter(Writer writer) { this.queue = new LinkedBlockingQueue<String>(CAPACITY); this.logger = new LoggerThread(writer); } public void start() { logger.start(); } /** * 需要打印數據的線程調用該方法, 將待打印數據加入阻塞隊列 */ public void log(String msg) throws InterruptedException { queue.put(msg); } /** * 負責從阻塞隊列中取出數據輸出的線程 */ private class LoggerThread extends Thread { private final PrintWriter writer; // ... public void run() { try { while (true) writer.println(queue.take()); } catch (InterruptedException ignored) { } finally { writer.close(); } } } }
LogWriter內部封裝有LoggerThread線程, 所以LogWriter是一個基于線程構建的Service. 根據ExecutorService的經驗, 我們需要在LogWriter中提供停止LoggerThread線程的方法. 看起來這并不是很難, 我們只需在LogWriter中添加shutDown方法:
/** * 該方法用于停止LoggerThread線程 */ public void shutDown() { logger.interrupt(); }
當LogWriter.shutDown方法被調用時, LoggerThread線程的中斷標記被設置為true, 之后LoggerThread線程執行queue.take()方法時會拋出InterruptedException異常, 從而使得LoggerThread線程結束.
然而這樣的shutDown方法并不是很恰當:?
1. 丟棄了隊列中尚未來得及輸出的數據.
2. 更嚴重的是, 假如線程A對LogWriter.log方法的調用因為隊列已滿而阻塞, 此時停止LoggerThread線程將導致線程A永遠阻塞在queue.put方法上.
對上例的改進:
public class LogWriter { private final BlockingQueue<String> queue; private final LoggerThread loggerThread; private final PrintWriter writer; /** * 表示是否關閉Service */ private boolean isShutdown; /** * 隊列中待處理數據的數量 */ private int reservations; public void start() { loggerThread.start(); } public void shutDown() { synchronized (this) { isShutdown = true; } loggerThread.interrupt(); } public void log(String msg) throws InterruptedException { synchronized (this) { // service已關閉后調用log方法直接拋出異常 if (isShutdown) throw new IllegalStateException("Service has been shut down"); ++reservations; } // BlockingQueue本身就是線程安全的, put方法的調用不在同步代碼塊中 // 我們只需要保證isShutdown和reservations是線程安全的即可 queue.put(msg); } private class LoggerThread extends Thread { public void run() { try { while (true) { try { synchronized (this) { // 當service已關閉且處理完隊列中的所有數據時才跳出while循環 if (isShutdown && reservations == 0) break; } String msg = queue.take(); synchronized (this) { --reservations; } writer.println(msg); } catch (InterruptedException e) { // 發生InterruptedException異常時不應該立刻跳出while循環 // 而應該繼續輸出log, 直到處理完隊列中的所有數據 } } } finally { writer.close(); } } } }
上面的處理顯得過于復雜, 利用ExecutorService可以編寫出相對更簡潔的程序:
public class LogService { /** * 創建只包含單個線程的線程池, 提交給該線程池的任務將以串行的方式逐個運行 * 本例中, 此線程用于執行打印log的任務 */ private final ExecutorService exec = Executors.newSingleThreadExecutor(); private final PrintWriter writer; public void start() { } public void shutdown() throws InterruptedException { try { // 關閉ExecutorService后再調用其awaitTermination將導致當前線程阻塞, 直到所有已提交的任務執行完畢, 或者發生超時 exec.shutdown(); exec.awaitTermination(TIMEOUT, UNIT); } finally { writer.close(); } } public void log(String msg) { try { // 線程池關閉后再調用其execute方法將拋出RejectedExecutionException異常 exec.execute(new WriteTask(msg)); } catch (RejectedExecutionException ignored) { } } private final class WriteTask implements Runnable { private String msg; public WriteTask(String msg) { this.msg = msg; } @Override public void run() { writer.println(msg); } } }