xylz,imxylz

          關注后端架構、中間件、分布式和并發編程

             :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理 ::
            111 隨筆 :: 10 文章 :: 2680 評論 :: 0 Trackbacks

          [本文PDF地址:http://www.aygfsteel.com/Files/xylz/Inside.Java.Concurrency_31.ThreadPool.part4_RejectedPolicy.pdf]

          上一節中提到關閉線程池過程中需要對新提交的任務進行處理。這個是java.util.concurrent.RejectedExecutionHandler處理的邏輯。

           

          在沒有分析線程池原理之前先來分析下為什么有任務拒絕的情況發生。

          這里先假設一個前提:線程池有一個任務隊列,用于緩存所有待處理的任務,正在處理的任務將從任務隊列中移除。因此在任務隊列長度有限的情況下就會出現新任務的拒絕處理問題,需要有一種策略來處理應該加入任務隊列卻因為隊列已滿無法加入的情況。另外在線程池關閉的時候也需要對任務加入隊列操作進行額外的協調處理。

           

          RejectedExecutionHandler提供了四種方式來處理任務拒絕策略。

          RejectedExecutionHandler

          RejectedExecutionHandler-class

          這四種策略是獨立無關的,是對任務拒絕處理的四中表現形式。最簡單的方式就是直接丟棄任務。但是卻有兩種方式,到底是該丟棄哪一個任務,比如可以丟棄當前將要加入隊列的任務本身(DiscardPolicy)或者丟棄任務隊列中最舊任務(DiscardOldestPolicy)。丟棄最舊任務也不是簡單的丟棄最舊的任務,而是有一些額外的處理。除了丟棄任務還可以直接拋出一個異常(RejectedExecutionException),這是比較簡單的方式。拋出異常的方式(AbortPolicy)盡管實現方式比較簡單,但是由于拋出一個RuntimeException,因此會中斷調用者的處理過程。除了拋出異常以外還可以不進入線程池執行,在這種方式(CallerRunsPolicy)中任務將有調用者線程去執行。

           

          上面是一些理論知識,下面結合一些例子進行分析討論。

          package xylz.study.concurrency;

          import java.lang.reflect.Field;
          import java.util.concurrent.ArrayBlockingQueue;
          import java.util.concurrent.ThreadPoolExecutor;
          import java.util.concurrent.TimeUnit;
          import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
          import java.util.concurrent.ThreadPoolExecutor.DiscardPolicy;

          public class ExecutorServiceDemo {

             
          static void log(String msg) {
                  System.out.println(System.currentTimeMillis()
          + " -> " + msg);
              }

             
          static int getThreadPoolRunState(ThreadPoolExecutor pool) throws Exception {
                  Field f
          = ThreadPoolExecutor.class.getDeclaredField("runState");
                  f.setAccessible(
          true);
                 
          int v = f.getInt(pool);
                 
          return v;
              }

             
          public static void main(String[] args) throws Exception {

                  ThreadPoolExecutor pool
          = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
                         
          new ArrayBlockingQueue<Runnable>(1));
                  pool.setRejectedExecutionHandler(
          new ThreadPoolExecutor.DiscardPolicy());
                 
          for (int i = 0; i < 10; i++) {
                     
          final int index = i;
                      pool.submit(
          new Runnable() {

                         
          public void run() {
                              log(
          "run task:" + index + " -> " + Thread.currentThread().getName());
                             
          try {
                                  Thread.sleep(
          1000L);
                              }
          catch (Exception e) {
                                  e.printStackTrace();
                              }
                              log(
          "run over:" + index + " -> " + Thread.currentThread().getName());
                          }
                      });
                  }
                  log(
          "before sleep");
                  Thread.sleep(
          4000L);
                  log(
          "before shutdown()");
                  pool.shutdown();
                  log(
          "after shutdown(),pool.isTerminated=" + pool.isTerminated());
                  pool.awaitTermination(
          1000L, TimeUnit.SECONDS);
                  log(
          "now,pool.isTerminated=" + pool.isTerminated() + ", state="
                         
          + getThreadPoolRunState(pool));
              }

          }

           


          第一種方式直接丟棄(DiscardPolicy)的輸出結果是:

          1294494050696 -> run task:0
          1294494050696 -> before sleep
          1294494051697 -> run over:0 -> pool-1-thread-1
          1294494051697 -> run task:1
          1294494052697 -> run over:1 -> pool-1-thread-1
          1294494054697 -> before shutdown()
          1294494054697 -> after shutdown(),pool.isTerminated=false
          1294494054698 -> now,pool.isTerminated=true, state=3

           

          對于上面的結果需要補充幾點。

          1. 線程池設定線程大小為1,因此輸出的線程就只有一個”pool-1-thread-1”,至于為什么是這個名稱,以后會分析。
          2. 任務隊列的大小為1,因此可以輸出一個任務執行結果。但是由于線程本身可以帶有一個任務,因此實際上一共執行了兩個任務(task0和task1)。
          3. shutdown()一個線程并不能理解是線程運行狀態位terminated,可能需要稍微等待一點時間。盡管這里等待時間參數是1000秒,但是實際上從輸出時間來看僅僅等了約1ms。
          4. 直接丟棄任務是丟棄將要進入線程池本身的任務,所以當運行task0是,task1進入任務隊列,task2~task9都被直接丟棄了,沒有運行。

          如果把策略換成丟棄最舊任務(DiscardOldestPolicy),結果會稍有不同。

          1294494484622 -> run task:0
          1294494484622 -> before sleep
          1294494485622 -> run over:0 -> pool-1-thread-1
          1294494485622 -> run task:9
          1294494486622 -> run over:9 -> pool-1-thread-1
          1294494488622 -> before shutdown()
          1294494488622 -> after shutdown(),pool.isTerminated=false
          1294494488623 -> now,pool.isTerminated=true, state=3

           

          這里依然只是執行兩個任務,但是換成了任務task0和task9。實際上task1~task8還是進入了任務隊列,只不過被task9擠出去了。

          對于異常策略(AbortPolicy)就比較簡單,這回調用線程的任務執行。

          對于調用線程執行方式(CallerRunsPolicy),輸出的結果就有意思了。

          1294496076266 -> run task:2 -> main
          1294496076266 -> run task:0 -> pool-1-thread-1
          1294496077266 -> run over:0 -> pool-1-thread-1
          1294496077266 -> run task:1 -> pool-1-thread-1
          1294496077266 -> run over:2 -> main
          1294496077266 -> run task:4 -> main
          1294496078267 -> run over:4 -> main
          1294496078267 -> run task:5 -> main
          1294496078267 -> run over:1 -> pool-1-thread-1
          1294496078267 -> run task:3 -> pool-1-thread-1
          1294496079267 -> run over:3 -> pool-1-thread-1
          1294496079267 -> run over:5 -> main
          1294496079267 -> run task:7 -> main
          1294496079267 -> run task:6 -> pool-1-thread-1
          1294496080267 -> run over:7 -> main
          1294496080267 -> run task:9 -> main
          1294496080267 -> run over:6 -> pool-1-thread-1
          1294496080267 -> run task:8 -> pool-1-thread-1
          1294496081268 -> run over:9 -> main
          1294496081268 -> before sleep
          1294496081268 -> run over:8 -> pool-1-thread-1
          1294496085268 -> before shutdown()
          1294496085268 -> after shutdown(),pool.isTerminated=false
          1294496085269 -> now,pool.isTerminated=true, state=3

           

          由于啟動線程有稍微的延時,因此一種可能的執行順序是這樣的。

          RejectedPolicy_CallerRunsPolicy

          1. 首先pool-1-thread-1線程執行task0,同時將task1加入任務隊列(submit(task1))。
          2. 對于task2,由于任務隊列已經滿了,因此有調用線程main執行(execute(task2))。
          3. 在mian等待task2任務執行完畢,對于任務task3,由于此時任務隊列已經空了,因此task3將進入任務隊列。
          4. 此時main線程是空閑的,因此對于task4將由main線程執行。此時pool-1-thread-1線程可能在執行任務task1。任務隊列中依然有任務task3。
          5. 因此main線程執行完畢task4后就立即執行task5。
          6. 很顯然task1執行完畢,task3被線程池執行,因此task6進入任務隊列。此時task7被main線程執行。
          7. task6開始執行時,task8進入任務隊列。main線程開始執行task9。
          8. 然后線程池執行線程task8結束。
          9. 整個任務隊列執行完畢,線程池完畢。

           

          如果有興趣可以看看ThreadPoolExecutor中四種RejectedExecutionHandler的源碼,都非常簡單。

           



          ©2009-2014 IMXYLZ |求賢若渴
          posted on 2011-01-08 22:47 imxylz 閱讀(9985) 評論(0)  編輯  收藏 所屬分類: Java Concurrency

          ©2009-2014 IMXYLZ
          主站蜘蛛池模板: 东丽区| 西城区| 溆浦县| 甘孜| 桐庐县| 阜城县| 定远县| 西昌市| 东平县| 新乡市| 益阳市| 石嘴山市| 张家界市| 伊金霍洛旗| 长汀县| 长岛县| 霍山县| 阿拉善右旗| 莆田市| 会昌县| 宜兴市| 和平区| 华蓥市| 鄄城县| 南通市| 双流县| 盘锦市| 合江县| 平陆县| 东方市| 大石桥市| 南川市| 牙克石市| 全州县| 长治县| 栖霞市| 乡城县| 南雄市| 城固县| 敦煌市| 汉中市|