xylz,imxylz

          關(guān)注后端架構(gòu)、中間件、分布式和并發(fā)編程

             :: 首頁 :: 新隨筆 :: 聯(lián)系 :: 聚合  :: 管理 ::
            111 隨筆 :: 10 文章 :: 2680 評(píng)論 :: 0 Trackbacks

          [本文PDF地址:http://www.aygfsteel.com/Files/xylz/Inside.Java.Concurrency_31.ThreadPool.part4_RejectedPolicy.pdf]

          上一節(jié)中提到關(guān)閉線程池過程中需要對(duì)新提交的任務(wù)進(jìn)行處理。這個(gè)是java.util.concurrent.RejectedExecutionHandler處理的邏輯。

           

          在沒有分析線程池原理之前先來分析下為什么有任務(wù)拒絕的情況發(fā)生。

          這里先假設(shè)一個(gè)前提:線程池有一個(gè)任務(wù)隊(duì)列,用于緩存所有待處理的任務(wù),正在處理的任務(wù)將從任務(wù)隊(duì)列中移除。因此在任務(wù)隊(duì)列長(zhǎng)度有限的情況下就會(huì)出現(xiàn)新任務(wù)的拒絕處理問題,需要有一種策略來處理應(yīng)該加入任務(wù)隊(duì)列卻因?yàn)殛?duì)列已滿無法加入的情況。另外在線程池關(guān)閉的時(shí)候也需要對(duì)任務(wù)加入隊(duì)列操作進(jìn)行額外的協(xié)調(diào)處理。

           

          RejectedExecutionHandler提供了四種方式來處理任務(wù)拒絕策略。

          RejectedExecutionHandler

          RejectedExecutionHandler-class

          這四種策略是獨(dú)立無關(guān)的,是對(duì)任務(wù)拒絕處理的四中表現(xiàn)形式。最簡(jiǎn)單的方式就是直接丟棄任務(wù)。但是卻有兩種方式,到底是該丟棄哪一個(gè)任務(wù),比如可以丟棄當(dāng)前將要加入隊(duì)列的任務(wù)本身(DiscardPolicy)或者丟棄任務(wù)隊(duì)列中最舊任務(wù)(DiscardOldestPolicy)。丟棄最舊任務(wù)也不是簡(jiǎn)單的丟棄最舊的任務(wù),而是有一些額外的處理。除了丟棄任務(wù)還可以直接拋出一個(gè)異常(RejectedExecutionException),這是比較簡(jiǎn)單的方式。拋出異常的方式(AbortPolicy)盡管實(shí)現(xiàn)方式比較簡(jiǎn)單,但是由于拋出一個(gè)RuntimeException,因此會(huì)中斷調(diào)用者的處理過程。除了拋出異常以外還可以不進(jìn)入線程池執(zhí)行,在這種方式(CallerRunsPolicy)中任務(wù)將有調(diào)用者線程去執(zhí)行。

           

          上面是一些理論知識(shí),下面結(jié)合一些例子進(jìn)行分析討論。

          package xylz.study.concurrency;

          import java.lang.reflect.Field;
          import java.util.concurrent.ArrayBlockingQueue;
          import java.util.concurrent.ThreadPoolExecutor;
          import java.util.concurrent.TimeUnit;
          import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
          import java.util.concurrent.ThreadPoolExecutor.DiscardPolicy;

          public class ExecutorServiceDemo {

             
          static void log(String msg) {
                  System.out.println(System.currentTimeMillis()
          + " -> " + msg);
              }

             
          static int getThreadPoolRunState(ThreadPoolExecutor pool) throws Exception {
                  Field f
          = ThreadPoolExecutor.class.getDeclaredField("runState");
                  f.setAccessible(
          true);
                 
          int v = f.getInt(pool);
                 
          return v;
              }

             
          public static void main(String[] args) throws Exception {

                  ThreadPoolExecutor pool
          = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
                         
          new ArrayBlockingQueue<Runnable>(1));
                  pool.setRejectedExecutionHandler(
          new ThreadPoolExecutor.DiscardPolicy());
                 
          for (int i = 0; i < 10; i++) {
                     
          final int index = i;
                      pool.submit(
          new Runnable() {

                         
          public void run() {
                              log(
          "run task:" + index + " -> " + Thread.currentThread().getName());
                             
          try {
                                  Thread.sleep(
          1000L);
                              }
          catch (Exception e) {
                                  e.printStackTrace();
                              }
                              log(
          "run over:" + index + " -> " + Thread.currentThread().getName());
                          }
                      });
                  }
                  log(
          "before sleep");
                  Thread.sleep(
          4000L);
                  log(
          "before shutdown()");
                  pool.shutdown();
                  log(
          "after shutdown(),pool.isTerminated=" + pool.isTerminated());
                  pool.awaitTermination(
          1000L, TimeUnit.SECONDS);
                  log(
          "now,pool.isTerminated=" + pool.isTerminated() + ", state="
                         
          + getThreadPoolRunState(pool));
              }

          }

           


          第一種方式直接丟棄(DiscardPolicy)的輸出結(jié)果是:

          1294494050696 -> run task:0
          1294494050696 -> before sleep
          1294494051697 -> run over:0 -> pool-1-thread-1
          1294494051697 -> run task:1
          1294494052697 -> run over:1 -> pool-1-thread-1
          1294494054697 -> before shutdown()
          1294494054697 -> after shutdown(),pool.isTerminated=false
          1294494054698 -> now,pool.isTerminated=true, state=3

           

          對(duì)于上面的結(jié)果需要補(bǔ)充幾點(diǎn)。

          1. 線程池設(shè)定線程大小為1,因此輸出的線程就只有一個(gè)”pool-1-thread-1”,至于為什么是這個(gè)名稱,以后會(huì)分析。
          2. 任務(wù)隊(duì)列的大小為1,因此可以輸出一個(gè)任務(wù)執(zhí)行結(jié)果。但是由于線程本身可以帶有一個(gè)任務(wù),因此實(shí)際上一共執(zhí)行了兩個(gè)任務(wù)(task0和task1)。
          3. shutdown()一個(gè)線程并不能理解是線程運(yùn)行狀態(tài)位terminated,可能需要稍微等待一點(diǎn)時(shí)間。盡管這里等待時(shí)間參數(shù)是1000秒,但是實(shí)際上從輸出時(shí)間來看僅僅等了約1ms。
          4. 直接丟棄任務(wù)是丟棄將要進(jìn)入線程池本身的任務(wù),所以當(dāng)運(yùn)行task0是,task1進(jìn)入任務(wù)隊(duì)列,task2~task9都被直接丟棄了,沒有運(yùn)行。

          如果把策略換成丟棄最舊任務(wù)(DiscardOldestPolicy),結(jié)果會(huì)稍有不同。

          1294494484622 -> run task:0
          1294494484622 -> before sleep
          1294494485622 -> run over:0 -> pool-1-thread-1
          1294494485622 -> run task:9
          1294494486622 -> run over:9 -> pool-1-thread-1
          1294494488622 -> before shutdown()
          1294494488622 -> after shutdown(),pool.isTerminated=false
          1294494488623 -> now,pool.isTerminated=true, state=3

           

          這里依然只是執(zhí)行兩個(gè)任務(wù),但是換成了任務(wù)task0和task9。實(shí)際上task1~task8還是進(jìn)入了任務(wù)隊(duì)列,只不過被task9擠出去了。

          對(duì)于異常策略(AbortPolicy)就比較簡(jiǎn)單,這回調(diào)用線程的任務(wù)執(zhí)行。

          對(duì)于調(diào)用線程執(zhí)行方式(CallerRunsPolicy),輸出的結(jié)果就有意思了。

          1294496076266 -> run task:2 -> main
          1294496076266 -> run task:0 -> pool-1-thread-1
          1294496077266 -> run over:0 -> pool-1-thread-1
          1294496077266 -> run task:1 -> pool-1-thread-1
          1294496077266 -> run over:2 -> main
          1294496077266 -> run task:4 -> main
          1294496078267 -> run over:4 -> main
          1294496078267 -> run task:5 -> main
          1294496078267 -> run over:1 -> pool-1-thread-1
          1294496078267 -> run task:3 -> pool-1-thread-1
          1294496079267 -> run over:3 -> pool-1-thread-1
          1294496079267 -> run over:5 -> main
          1294496079267 -> run task:7 -> main
          1294496079267 -> run task:6 -> pool-1-thread-1
          1294496080267 -> run over:7 -> main
          1294496080267 -> run task:9 -> main
          1294496080267 -> run over:6 -> pool-1-thread-1
          1294496080267 -> run task:8 -> pool-1-thread-1
          1294496081268 -> run over:9 -> main
          1294496081268 -> before sleep
          1294496081268 -> run over:8 -> pool-1-thread-1
          1294496085268 -> before shutdown()
          1294496085268 -> after shutdown(),pool.isTerminated=false
          1294496085269 -> now,pool.isTerminated=true, state=3

           

          由于啟動(dòng)線程有稍微的延時(shí),因此一種可能的執(zhí)行順序是這樣的。

          RejectedPolicy_CallerRunsPolicy

          1. 首先pool-1-thread-1線程執(zhí)行task0,同時(shí)將task1加入任務(wù)隊(duì)列(submit(task1))。
          2. 對(duì)于task2,由于任務(wù)隊(duì)列已經(jīng)滿了,因此有調(diào)用線程main執(zhí)行(execute(task2))。
          3. 在mian等待task2任務(wù)執(zhí)行完畢,對(duì)于任務(wù)task3,由于此時(shí)任務(wù)隊(duì)列已經(jīng)空了,因此task3將進(jìn)入任務(wù)隊(duì)列。
          4. 此時(shí)main線程是空閑的,因此對(duì)于task4將由main線程執(zhí)行。此時(shí)pool-1-thread-1線程可能在執(zhí)行任務(wù)task1。任務(wù)隊(duì)列中依然有任務(wù)task3。
          5. 因此main線程執(zhí)行完畢task4后就立即執(zhí)行task5。
          6. 很顯然task1執(zhí)行完畢,task3被線程池執(zhí)行,因此task6進(jìn)入任務(wù)隊(duì)列。此時(shí)task7被main線程執(zhí)行。
          7. task6開始執(zhí)行時(shí),task8進(jìn)入任務(wù)隊(duì)列。main線程開始執(zhí)行task9。
          8. 然后線程池執(zhí)行線程task8結(jié)束。
          9. 整個(gè)任務(wù)隊(duì)列執(zhí)行完畢,線程池完畢。

           

          如果有興趣可以看看ThreadPoolExecutor中四種RejectedExecutionHandler的源碼,都非常簡(jiǎn)單。

           



          ©2009-2014 IMXYLZ |求賢若渴
          posted on 2011-01-08 22:47 imxylz 閱讀(9986) 評(píng)論(0)  編輯  收藏 所屬分類: Java Concurrency

          ©2009-2014 IMXYLZ
          主站蜘蛛池模板: 靖远县| 紫金县| 咸阳市| 万安县| 太和县| 蛟河市| 尼玛县| 鄂托克前旗| 宁远县| 定日县| 宁波市| 江北区| 永济市| 松溪县| 丰都县| 兴业县| 五家渠市| 龙南县| 鄄城县| 恩施市| 太湖县| 绍兴市| 安溪县| 湖州市| 彭水| 恩施市| 竹北市| 合川市| 阳城县| 西畴县| 福鼎市| 潢川县| 淮南市| 银川市| 安化县| 尼木县| 英吉沙县| 瑞安市| 莆田市| 龙口市| 宜宾县|