我的家園

          我的家園

          task與execution--JCIPC08讀書筆記

          Posted on 2012-04-15 16:26 zljpp 閱讀(99) 評論(0)  編輯  收藏

          [本文是我對Java Concurrency In Practice C08的歸納和總結. ?轉載請注明作者和出處, ?如有謬誤, 歡迎在評論中指正. ]

          task和線程池執行機制之間隱式的耦合

          前面曾提到過, 線程池的應用解耦了task的提交和執行. 事實上, 這有所夸大, 因為不是所有的task都適用于所有的執行機制, 某些task要求在特定的線程池中執行:

          1. 非獨立task, 指的是依賴于其他task的任務.?

          2. 要求在單線程中運行的task. 某些task不是線程安全的, 無法并發運行. Executors.newSingleThreadExecutor()方法返回的線程池只包含單個線程, 提交給該線程池的task將緩存在一個無界隊列中, 線程池中所包含的單個線程將依次從隊列中取出task運行.

          3. 響應時間敏感的task. 某些task要求必須在極短的時間內開始執行, 比如GUI應用中處理用戶點擊操作的task. 假如提交給某一線程池的task既包含long-running task, 也包含響應時間敏感的task, 那么響應時間敏感的task可能無法在極短的時間內得到執行.?

          4. 使用了ThreadLocal類的task. 線程池的標準實現可能會在空閑時銷毀多余的線程, 繁忙時創建更多的線程, 更有可能重用線程. 所以使用了ThreadLocal的task不應該提交給線程池運行, 除非ThreadLocal的使用只限定在單個task內, 不用于多個task之間通信.

          ?

          線程饑餓死鎖

          如果提交給線程池運行的task之間不是相互獨立的, 就有可能出現線程饑餓死鎖. 比如提交給SingleThreadExecutor執行的2個task, task A在執行過程中需要等待task B的執行結果才能繼續, 而此時沒有多余的線程用于執行task B, 如此就發生了線程饑餓死鎖.

          public class StarvationDeadLock {
          	public static void main(String[] args) {
          		final ExecutorService executor = Executors.newSingleThreadExecutor();
          		final Runnable taskB = new Runnable() {
          			@Override
          			public void run() {
          				//...
          			}
          		};
          		Runnable taskA = new Runnable() {
          			@Override
          			public void run() {
          				Future<?> future = executor.submit(taskB);
          				try {
          					System.out.println("waiting for taskB complete");
          					// get方法將阻塞, 直到taskB執行完成
          					// 但是由于線程池中只有一個線程, 而該線程已經被taskA占用, 所以taskB將沒有機會執行. 
          					// 此時就發生了線程饑餓死鎖
          					future.get();
          				} catch (InterruptedException e) {
          					Thread.currentThread().interrupt();
          				} catch (ExecutionException e) {
          					e.printStackTrace();
          				}
          				//...
          			}
          		};
          		executor.submit(taskA);
          	}
          }

          不僅SingleThreadExecutor執行相互依賴的task時會發生死鎖, 其他線程池執行相互依賴的task時也可能發生死鎖:

          public class StarvationDeadLock {
          	public static void main(String[] args) {
          		final ExecutorService executor = Executors.newFixedThreadPool(3);
          		// 設定await在Barrier對象上的線程數達到4個時, 其await方法才釋放
          		final CyclicBarrier barrier = new CyclicBarrier(4);
          		
          		// 重復提交4個task, 每個task都await在barrier對象上
          		// barrier的await方法將一直阻塞, 直到4個線程都到達await點.
          		// 但是線程池中只有3個線程, 不可能出現4個線程都達到await點的情形, 所以依然會發生死鎖
          		for (int i = 0; i < 4; i++) {
          			executor.submit(new Runnable() {
          				@Override
          				public void run() {
          					try {
          						System.out.println("waiting for other tasks arriving at common point");
          						barrier.await();
          					} catch (InterruptedException e) {
          						Thread.currentThread().interrupt();
          					} catch (BrokenBarrierException e) {
          						e.printStackTrace();
          					}
          				}
          			});
          		}
          	}
          }?

          避免相互依賴的task提交給同一線程池執行時發生死鎖的唯一方法是: 線程池中的線程足夠多.?

          ?

          確定線程池的size

          如果線程池的size過大, 將造成內存等資源的浪費, 甚至使得資源耗盡. 如果線程池的size過小, 將造成CPU的利用率不高. 確定合適的size需要考慮:CPU數, 內存, 是計算密集型task還是I/O密集型task, 是否需要獲取稀缺資源(比如數據庫連接)等.

          對于計算密集型task, 合適的size大約為CPU數量+1. 對于I/O占較大比例的task, 合適的size可以通過以下公式確定: size = CPU數量 * CPU利用率 * (1 + I/O時間比例). Runtime.getRuntime().availableProcessors()返回CPU的個數.

          當然, 實際開發中size還受到內存, 文件句柄, socket, 數據庫連接數等稀缺資源的約束. 將總的稀缺資源除以每一個task使用的資源數, 能得到線程數的上限.?

          ?

          循環并行化

          如果循環體所進行的操作是相互獨立的, 這樣的循環可以并發的運行:

          // 循環操作
          void processSequentially(List<Element> elements) {
          	for (Element e : elements)
          		process(e);
          }
          
          // 將相互獨立的循環操作轉變為并發操作
          void processInParallel(Executor exec, List<Element> elements) {
          	for (final Element e : elements) {
          		exec.execute(new Runnable() {
          			public void run() {
          				process(e);
          			}
          		});
          	}
          	exec.shutdown(); 
          	exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
          }?

          如果希望同時提交一系列task, 并且等待它們執行完畢, 可以調用ExecutorService.invokeAll方法.

          如果希望task執行完畢之后就獲取其執行結果, 可以使用CompletionService.

          ?






          只有注冊用戶登錄后才能發表評論。


          網站導航:
           
          主站蜘蛛池模板: 化德县| 柞水县| 衡阳市| 缙云县| 灵宝市| 巢湖市| 桦川县| 武陟县| 耿马| 涿鹿县| 鸡西市| 尚志市| 建瓯市| 汝州市| 尼木县| 平塘县| 水富县| 兴宁市| 桃源县| 湖北省| 京山县| 嘉鱼县| 洪洞县| 宁强县| 夏邑县| 亳州市| 永靖县| 茌平县| 土默特左旗| 茶陵县| 涞源县| 信宜市| 文安县| 宜丰县| 略阳县| 延庆县| 商都县| 徐汇区| 松阳县| 会同县| 韶山市|