xylz,imxylz

          關注后端架構、中間件、分布式和并發編程

             :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理 ::
            111 隨筆 :: 10 文章 :: 2680 評論 :: 0 Trackbacks

          [本文地址:http://www.aygfsteel.com/Files/xylz/Inside.Java.Concurrency_34.ThreadPool.part7_ThreadPoolExecutor_execute.pdf] 

           

          線程池任務執行流程

          我們從一個API開始接觸Executor是如何處理任務隊列的。

          java.util.concurrent.Executor.execute(Runnable)

          Executes the given task sometime in the future. The task may execute in a new thread or in an existing pooled thread. If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached, the task is handled by the current RejectedExecutionHandler.

          線程池中所有任務執行都依賴于此接口。這段話有以下幾個意思:

          1. 任務可能在將來某個時刻被執行,有可能不是立即執行。為什么這里有兩個“可能”?繼續往下面看。
          2. 任務可能在一個新的線程中執行或者線程池中存在的一個線程中執行。
          3. 任務無法被提交執行有以下兩個原因:線程池已經關閉或者線程池已經達到了容量限制。
          4. 所有失敗的任務都將被“當前”的任務拒絕策略RejectedExecutionHandler 處理。

          回答上面兩個“可能“。任務可能被執行,那不可能的情況就是上面說的情況3;可能不是立即執行,是因為任務可能還在隊列中排隊,因此還在等待分配線程執行。了解完了字面上的問題,我們再來看具體的實現。

          public void execute(Runnable command) {
             
          if (command == null)
                 
          throw new NullPointerException();
             
          if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
                 
          if (runState == RUNNING && workQueue.offer(command)) {
                     
          if (runState != RUNNING || poolSize == 0)
                          ensureQueuedTaskHandled(command);
                  }
                 
          else if (!addIfUnderMaximumPoolSize(command))
                      reject(command);
          // is shutdown or saturated
              }
          }

          這一段代碼看起來挺簡單的,其實這就是線程池最重要的一部分,如果能夠完全理解這一塊,線程池還是挺容易的。整個執行流程是這樣的:

          1. 如果任務command為空,則拋出空指針異常,返回。否則進行2。
          2. 如果當前線程池大小 大于或等于 核心線程池大小,進行4。否則進行3。
          3. 創建一個新工作隊列(線程,參考上一節),成功直接返回,失敗進行4。
          4. 如果線程池正在運行并且任務加入線程池隊列成功,進行5,否則進行7。
          5. 如果線程池已經關閉或者線程池大小為0,進行6,否則直接返回。
          6. 如果線程池已經關閉則執行拒絕策略返回,否則啟動一個新線程來進行執行任務,返回。
          7. 如果線程池大小 不大于 最大線程池數量,則啟動新線程來進行執行,否則進行拒絕策略,結束。

          文字描述步驟不夠簡單?下面圖形詳細表述了此過程。

          Executor.execute

          老實說這個圖比上面步驟更難以理解,那么從何入手呢。

          流程的入口很簡單,我們就是要執行一個任務(Runnable command),那么它的結束點在哪或者有哪幾個?

          根據左邊這個圖我們知道可能有以下幾種出口:

          (1)圖中的P1、P7,我們根據這條路徑可以看到,僅僅是將任務加入任務隊列(offer(command))了;

          (2)圖中的P3,這條路徑不將任務加入任務隊列,但是啟動了一個新工作線程(Worker)進行掃尾操作,用戶處理為空的任務隊列;

          (3)圖中的P4,這條路徑沒有將任務加入任務隊列,但是啟動了一個新工作線程(Worker),并且工作現場的第一個任務就是當前任務;

          (4)圖中的P5、P6,這條路徑沒有將任務加入任務隊列,也沒有啟動工作線程,僅僅是拋給了任務拒絕策略。P2是任務加入了任務隊列卻因為線程池已經關閉于是又從任務隊列中刪除,并且拋給了拒絕策略。

          如果上面的解釋還不清楚,可以去研究下面兩段代碼:

          java.util.concurrent.ThreadPoolExecutor.addIfUnderCorePoolSize(Runnable)
          java.util.concurrent.ThreadPoolExecutor.addIfUnderMaximumPoolSize(Runnable)
          java.util.concurrent.ThreadPoolExecutor.ensureQueuedTaskHandled(Runnable)

          那么什么時候一個任務被立即執行呢?

          在線程池運行狀態下,如果線程池大小 小于 核心線程池大小或者線程池已滿(任務隊列已滿)并且線程池大小 小于 最大線程池大小(此時線程池大小 大于 核心線程池大小的),用程序描述為:

          runState == RUNNING && ( poolSize < corePoolSize || poolSize < maxnumPoolSize && workQueue.isFull())

          上面的條件就是一個任務能夠被立即執行的條件。

          有了execute的基礎,我們看看ExecutorService中的幾個submit方法的實現。

              public Future<?> submit(Runnable task) {
                 
          if (task == null) throw new NullPointerException();
                  RunnableFuture
          <Object> ftask = newTaskFor(task, null);
                  execute(ftask);
                 
          return ftask;
              }

             
          public <T> Future<T> submit(Runnable task, T result) {
                 
          if (task == null) throw new NullPointerException();
                  RunnableFuture
          <T> ftask = newTaskFor(task, result);
                  execute(ftask);
                 
          return ftask;
              }

             
          public <T> Future<T> submit(Callable<T> task) {
                 
          if (task == null) throw new NullPointerException();
                  RunnableFuture
          <T> ftask = newTaskFor(task);
                  execute(ftask);
                 
          return ftask;
              }

          很簡單,不是么?對于一個線程池來說復雜的地方也就在execute方法的執行流程。在下一節中我們來討論下如何獲取任務的執行結果,也就是Future類的使用和原理。

           



          ©2009-2014 IMXYLZ |求賢若渴
          posted on 2011-02-11 23:48 imxylz 閱讀(11689) 評論(2)  編輯  收藏 所屬分類: J2EEJava Concurrency

          評論

          # re: 深入淺出 Java Concurrency (34): 線程池 part 7 線程池的實現及原理 (2) 2011-02-12 15:47 yeshucheng
          新年伊始,就看看到好東西,呵呵  回復  更多評論
            

          # re: 深入淺出 Java Concurrency (34): 線程池 part 7 線程池的實現及原理 (2) 2011-05-28 17:19 devoleper
          圖中的P3,這條路徑,為什么沒有將任務加入隊列呢?我看程序是加入了的呀?JDK1.5和1.6的實現好像不太一致...  回復  更多評論
            


          ©2009-2014 IMXYLZ
          主站蜘蛛池模板: 洛宁县| 南召县| 株洲市| 维西| 玉林市| 如皋市| 凤阳县| 洱源县| 沧源| 木兰县| 彰武县| 锡林浩特市| 呼图壁县| 台北市| 加查县| 陆川县| 开封市| 南昌县| 杭州市| 贵德县| 临泽县| 湾仔区| 盐边县| 沈阳市| 延庆县| 开江县| 小金县| 如东县| 潮安县| 海门市| 大埔县| 两当县| 乳山市| 会宁县| 东阳市| 鸡东县| 渑池县| 宕昌县| 美姑县| 台江县| 德江县|