我的家園

          我的家園

          [本文是我對Java Concurrency In Practice 7.2的歸納和總結. ?轉載請注明作者和出處, ?如有謬誤, 歡迎在評論中指正. ]

          以ExecutorService為例, 該類內部封裝有多個線程, 類外部無法直接停止這些線程. 相反, 外部調用Service的shutDown和shutDownNow方法關閉Service, 而Service負責停止其擁有的線程.

          大多數server應用會使用到log, 下例中的LogWriter是一個使用生產者消費者模式構建的log service, 需要打印log的線程將待打印的內容加入到阻塞隊列中, 而logger線程則不斷的從阻塞隊列中取出數據輸出:

          public class LogWriter {
          	private final BlockingQueue<String> queue;
          	private final LoggerThread logger;
          
          	public LogWriter(Writer writer) {
          		this.queue = new LinkedBlockingQueue<String>(CAPACITY);
          		this.logger = new LoggerThread(writer);
          	}
          
          	public void start() {
          		logger.start();
          	}
          
          	/**
          	 * 需要打印數據的線程調用該方法, 將待打印數據加入阻塞隊列
          	 */
          	public void log(String msg) throws InterruptedException {
          		queue.put(msg);
          	}
          
          	/**
          	 * 負責從阻塞隊列中取出數據輸出的線程
          	 */
          	private class LoggerThread extends Thread {
          		private final PrintWriter writer;
          		// ...
          		public void run() {
          			try {
          				while (true)
          					writer.println(queue.take());
          			} catch (InterruptedException ignored) {
          			} finally {
          				writer.close();
          			}
          		}
          	}
          }

          LogWriter內部封裝有LoggerThread線程, 所以LogWriter是一個基于線程構建的Service. 根據ExecutorService的經驗, 我們需要在LogWriter中提供停止LoggerThread線程的方法. 看起來這并不是很難, 我們只需在LogWriter中添加shutDown方法:

          /**
           * 該方法用于停止LoggerThread線程
           */
          public void shutDown() {
          	logger.interrupt();
          }

          當LogWriter.shutDown方法被調用時, LoggerThread線程的中斷標記被設置為true, 之后LoggerThread線程執行queue.take()方法時會拋出InterruptedException異常, 從而使得LoggerThread線程結束.

          然而這樣的shutDown方法并不是很恰當:?

          1. 丟棄了隊列中尚未來得及輸出的數據.

          2. 更嚴重的是, 假如線程A對LogWriter.log方法的調用因為隊列已滿而阻塞, 此時停止LoggerThread線程將導致線程A永遠阻塞在queue.put方法上.

          對上例的改進:

          public class LogWriter {
          	private final BlockingQueue<String> queue;
          	private final LoggerThread loggerThread;
          	private final PrintWriter writer;
          
          	/**
          	 * 表示是否關閉Service
          	 */
          	private boolean isShutdown;
          	/**
          	 * 隊列中待處理數據的數量
          	 */
          	private int reservations;
          
          	public void start() {
          		loggerThread.start();
          	}
          
          	public void shutDown() {
          		synchronized (this) {
          			isShutdown = true;
          		}
          		loggerThread.interrupt();
          	}
          
          	public void log(String msg) throws InterruptedException {
          		synchronized (this) {
          			// service已關閉后調用log方法直接拋出異常
          			if (isShutdown)
          				throw new IllegalStateException("Service has been shut down");
          			++reservations;
          		}
          		// BlockingQueue本身就是線程安全的, put方法的調用不在同步代碼塊中
          		// 我們只需要保證isShutdown和reservations是線程安全的即可
          		queue.put(msg);
          	}
          
          	private class LoggerThread extends Thread {
          		public void run() {
          			try {
          				while (true) {
          					try {
          						synchronized (this) {
          							// 當service已關閉且處理完隊列中的所有數據時才跳出while循環
          							if (isShutdown && reservations == 0)
          								break;
          						}
          						String msg = queue.take();
          						synchronized (this) {
          							--reservations;
          						}
          						writer.println(msg);
          					} catch (InterruptedException e) {
          						// 發生InterruptedException異常時不應該立刻跳出while循環
          						// 而應該繼續輸出log, 直到處理完隊列中的所有數據
          					}
          				}
          			} finally {
          				writer.close();
          			}
          		}
          	}
          }

          上面的處理顯得過于復雜, 利用ExecutorService可以編寫出相對更簡潔的程序:

          public class LogService {
          	/**
          	 * 創建只包含單個線程的線程池, 提交給該線程池的任務將以串行的方式逐個運行
          	 * 本例中, 此線程用于執行打印log的任務
          	 */
          	private final ExecutorService exec = Executors.newSingleThreadExecutor();
          	private final PrintWriter writer;
          
          	public void start() {
          	}
          
          	public void shutdown() throws InterruptedException {
          		try {
          			// 關閉ExecutorService后再調用其awaitTermination將導致當前線程阻塞, 直到所有已提交的任務執行完畢, 或者發生超時
          			exec.shutdown();
          			exec.awaitTermination(TIMEOUT, UNIT);
          		} finally {
          			writer.close();
          		}
          	}
          
          	public void log(String msg) {
          		try {
          			// 線程池關閉后再調用其execute方法將拋出RejectedExecutionException異常
          			exec.execute(new WriteTask(msg));
          		} catch (RejectedExecutionException ignored) {
          		}
          	}
          	
          	private final class WriteTask implements Runnable {
          		private String msg;
          		
          		public WriteTask(String msg) {
          			this.msg = msg;
          		}
          
          		@Override
          		public void run() {
          			writer.println(msg);
          		}
          	}
          }





          只有注冊用戶登錄后才能發表評論。


          網站導航:
           
          主站蜘蛛池模板: 鄂托克旗| 临汾市| 龙门县| 稷山县| 岱山县| 闻喜县| 东丰县| 和平县| 澳门| 额济纳旗| 吐鲁番市| 东兰县| 新源县| 盖州市| 无棣县| 钦州市| 宝兴县| 遂川县| 湖州市| 壶关县| 辛集市| 牟定县| 抚州市| 深水埗区| 曲麻莱县| 德庆县| 辛集市| 荔浦县| 牙克石市| 喜德县| 内丘县| 漳浦县| 剑阁县| 甘南县| 英德市| 扶风县| 商南县| 达孜县| 元江| 巴楚县| 山西省|