我的家園

          我的家園

          [本文是我對Java Concurrency In Practice 7.2的歸納和總結(jié). ?轉(zhuǎn)載請注明作者和出處, ?如有謬誤, 歡迎在評論中指正. ]

          以ExecutorService為例, 該類內(nèi)部封裝有多個線程, 類外部無法直接停止這些線程. 相反, 外部調(diào)用Service的shutDown和shutDownNow方法關(guān)閉Service, 而Service負(fù)責(zé)停止其擁有的線程.

          大多數(shù)server應(yīng)用會使用到log, 下例中的LogWriter是一個使用生產(chǎn)者消費者模式構(gòu)建的log service, 需要打印log的線程將待打印的內(nèi)容加入到阻塞隊列中, 而logger線程則不斷的從阻塞隊列中取出數(shù)據(jù)輸出:

          public class LogWriter {
          	private final BlockingQueue<String> queue;
          	private final LoggerThread logger;
          
          	public LogWriter(Writer writer) {
          		this.queue = new LinkedBlockingQueue<String>(CAPACITY);
          		this.logger = new LoggerThread(writer);
          	}
          
          	public void start() {
          		logger.start();
          	}
          
          	/**
          	 * 需要打印數(shù)據(jù)的線程調(diào)用該方法, 將待打印數(shù)據(jù)加入阻塞隊列
          	 */
          	public void log(String msg) throws InterruptedException {
          		queue.put(msg);
          	}
          
          	/**
          	 * 負(fù)責(zé)從阻塞隊列中取出數(shù)據(jù)輸出的線程
          	 */
          	private class LoggerThread extends Thread {
          		private final PrintWriter writer;
          		// ...
          		public void run() {
          			try {
          				while (true)
          					writer.println(queue.take());
          			} catch (InterruptedException ignored) {
          			} finally {
          				writer.close();
          			}
          		}
          	}
          }

          LogWriter內(nèi)部封裝有LoggerThread線程, 所以LogWriter是一個基于線程構(gòu)建的Service. 根據(jù)ExecutorService的經(jīng)驗, 我們需要在LogWriter中提供停止LoggerThread線程的方法. 看起來這并不是很難, 我們只需在LogWriter中添加shutDown方法:

          /**
           * 該方法用于停止LoggerThread線程
           */
          public void shutDown() {
          	logger.interrupt();
          }

          當(dāng)LogWriter.shutDown方法被調(diào)用時, LoggerThread線程的中斷標(biāo)記被設(shè)置為true, 之后LoggerThread線程執(zhí)行queue.take()方法時會拋出InterruptedException異常, 從而使得LoggerThread線程結(jié)束.

          然而這樣的shutDown方法并不是很恰當(dāng):?

          1. 丟棄了隊列中尚未來得及輸出的數(shù)據(jù).

          2. 更嚴(yán)重的是, 假如線程A對LogWriter.log方法的調(diào)用因為隊列已滿而阻塞, 此時停止LoggerThread線程將導(dǎo)致線程A永遠(yuǎn)阻塞在queue.put方法上.

          對上例的改進(jìn):

          public class LogWriter {
          	private final BlockingQueue<String> queue;
          	private final LoggerThread loggerThread;
          	private final PrintWriter writer;
          
          	/**
          	 * 表示是否關(guān)閉Service
          	 */
          	private boolean isShutdown;
          	/**
          	 * 隊列中待處理數(shù)據(jù)的數(shù)量
          	 */
          	private int reservations;
          
          	public void start() {
          		loggerThread.start();
          	}
          
          	public void shutDown() {
          		synchronized (this) {
          			isShutdown = true;
          		}
          		loggerThread.interrupt();
          	}
          
          	public void log(String msg) throws InterruptedException {
          		synchronized (this) {
          			// service已關(guān)閉后調(diào)用log方法直接拋出異常
          			if (isShutdown)
          				throw new IllegalStateException("Service has been shut down");
          			++reservations;
          		}
          		// BlockingQueue本身就是線程安全的, put方法的調(diào)用不在同步代碼塊中
          		// 我們只需要保證isShutdown和reservations是線程安全的即可
          		queue.put(msg);
          	}
          
          	private class LoggerThread extends Thread {
          		public void run() {
          			try {
          				while (true) {
          					try {
          						synchronized (this) {
          							// 當(dāng)service已關(guān)閉且處理完隊列中的所有數(shù)據(jù)時才跳出while循環(huán)
          							if (isShutdown && reservations == 0)
          								break;
          						}
          						String msg = queue.take();
          						synchronized (this) {
          							--reservations;
          						}
          						writer.println(msg);
          					} catch (InterruptedException e) {
          						// 發(fā)生InterruptedException異常時不應(yīng)該立刻跳出while循環(huán)
          						// 而應(yīng)該繼續(xù)輸出log, 直到處理完隊列中的所有數(shù)據(jù)
          					}
          				}
          			} finally {
          				writer.close();
          			}
          		}
          	}
          }

          上面的處理顯得過于復(fù)雜, 利用ExecutorService可以編寫出相對更簡潔的程序:

          public class LogService {
          	/**
          	 * 創(chuàng)建只包含單個線程的線程池, 提交給該線程池的任務(wù)將以串行的方式逐個運行
          	 * 本例中, 此線程用于執(zhí)行打印log的任務(wù)
          	 */
          	private final ExecutorService exec = Executors.newSingleThreadExecutor();
          	private final PrintWriter writer;
          
          	public void start() {
          	}
          
          	public void shutdown() throws InterruptedException {
          		try {
          			// 關(guān)閉ExecutorService后再調(diào)用其awaitTermination將導(dǎo)致當(dāng)前線程阻塞, 直到所有已提交的任務(wù)執(zhí)行完畢, 或者發(fā)生超時
          			exec.shutdown();
          			exec.awaitTermination(TIMEOUT, UNIT);
          		} finally {
          			writer.close();
          		}
          	}
          
          	public void log(String msg) {
          		try {
          			// 線程池關(guān)閉后再調(diào)用其execute方法將拋出RejectedExecutionException異常
          			exec.execute(new WriteTask(msg));
          		} catch (RejectedExecutionException ignored) {
          		}
          	}
          	
          	private final class WriteTask implements Runnable {
          		private String msg;
          		
          		public WriteTask(String msg) {
          			this.msg = msg;
          		}
          
          		@Override
          		public void run() {
          			writer.println(msg);
          		}
          	}
          }





          只有注冊用戶登錄后才能發(fā)表評論。


          網(wǎng)站導(dǎo)航:
           
          主站蜘蛛池模板: 邻水| 顺昌县| 湛江市| 洛阳市| 化州市| 海安县| 龙州县| 桃园市| 汝城县| 保德县| 都匀市| 承德市| 志丹县| 皋兰县| 平谷区| 交口县| 凉山| 南陵县| 博野县| 合水县| 新余市| 凌海市| 古田县| 和平区| 来凤县| 嘉善县| 贺兰县| 临颍县| 满洲里市| 堆龙德庆县| 温宿县| 东莞市| 长丰县| 嘉义县| 溆浦县| 西乌| 偏关县| 龙泉市| 正安县| 金溪县| 海口市|