隨筆-7  評論-23  文章-0  trackbacks-0
           1.       ExecutorService

           
          Java1.5開始正式提供了并發包,而這個并發包里面除了原子變量,synchronizer,并發容器,另外一個非常重要的特性就是線程池.對于線程池的意義,我們這邊不再多說.

          上圖是線程池的主體類圖,ThreadPoolExecutor是應用最為廣泛的一個線程池實現(我也將在接下來的文字中詳細描述我對這個類的理解和執行機制),ScheduledThreadPoolExecutor則在ThreadPoolExecutor上提供了定時執行的等附加功能,這個可以從ScheduledExecutorService接口的定義中看出來.Executors則類似工廠方法,提供了幾個非常常用的線程池初始化方法.

          ThreadPoolExecutor

          這個類繼承了AbstractExecutorService抽象類, AbstractExecutorService主要的職責有2部分,一部分定義和實現提交任務的方法(3submit方法的實現) ,實例化FutureTask并且交給子類執行,另外一部分實現invokeAny,invokeAll方法.留給子類的方法為execute方法,也就是Executor接口定義的方法.

          //實例化一個FutureTask,交給子類的execute方法執行.這種設計能夠保證callable和runnable的執行接口方法的一致性(FutureTask包裝了這個差別)
          public <T> Future<T> submit(Runnable task, T result) {
              
          if (task == nullthrow new NullPointerException();
              RunnableFuture
          <T> ftask = newTaskFor(task, result);
              execute(ftask);
              
          return ftask;
          }


          protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
              
          return new FutureTask<T>(runnable, value);
          }
          關于FutureTask這個類的實現,我在前面的JAVA LOCK代碼淺析有講過其實現原理,主要的思想就是關注任務完成與未完成的狀態,任務提交線程get()結果時被park,等待任務執行完成被喚醒,任務執行線程在任務執行完畢后設置結果,并且unpark對應線程并且讓其得到執行結果.

          回到ThreadPoolExecutor.ThreadPoolExecutor需要實現除了我們剛才說的execute(Runnable command)方法外,還得實現ExecutorService接口定義的部分方法.ThreadPoolExecutor所提供的不光是這些,以下根據我的理解來列一下它所具有的特性
          1.       execute流程
          2.      
          3.       工作隊列
          4.       飽和拒絕策略
          5.       線程工廠
          6.       beforeExecuteafterExecute擴展

          execute方法的實現有個機制非常重要,當當前線程池線程數量小于corePoolSize,那么生成一個新的worker并把提交的任務置為這個工作線程的頭一個執行任務,如果大于corePoolSize,那么會試著將提交的任務塞到workQueue里面供線程池里面的worker稍后執行,并不是直接再起一個worker,但是當workQueue也滿,并且當前線程池小于maxPoolSize,那么起一個新的worker并將該任務設為該worker執行的第一個任務執行,大于maxPoolSize,workQueue也滿負荷,那么調用飽和策略里面的行為.

          worker線程在執行完一個任務之后并不會立刻關閉,而是嘗試著去workQueue里面取任務,如果取不到,根據策略關閉或者保持空閑狀態.所以submit任務的時候,提交的順序為核心線程池------工作隊列------擴展線程池.

          池包括核心池
          ,擴展池(2者的線程在同一個hashset中,這里只是為了方便才這么稱呼,并不是分離的),核心池在池內worker沒有用完的情況下,只要有任務提交都會創建新的線程,其代表線程池正常處理任務的能力.擴展池,是在核心線程池用完,并且工作隊列也已排滿任務的情況下才會開始初始化線程,其代表的是線程池超出正常負載時的解決方案,一旦任務完成,并且試圖從workQueue取不到任務,那么會比較當前線程池與核心線程池的大小,大于核心線程池數的worker將被銷毀.
          Runnable getTask() {
              
          for (;;) {
                  
          try {
                      
          int state = runState;
                      
          //>SHUTDOWN就是STOP或者TERMINATED
                      
          //直接返回
                      if (state > SHUTDOWN)
                          
          return null;
                      Runnable r;
                      
          //如果是SHUTDOWN狀態,那么取任務,如果有
                        
          //將剩余任務執行完畢,否則就結束了
                      if (state == SHUTDOWN)  // Help drain queue
                          r = workQueue.poll();
                      
          //如果不是以上狀態的(也就是RUNNING狀態的),那么如果當前池大于核心池數量,
                      
          //或者允許核心線程池取任務超時就可以關閉,那么從任務隊列取任務,
                      
          //如果超出keepAliveTime,那么就返回null了,也就意味著這個worker結束了
                      else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                          r 
          = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                      
          //如果當前池小于核心池,并且不允許核心線程池取任務超時就關閉,那么take(),直到拿到任務或者被interrupt
                      else
                          r 
          = workQueue.take();
                      
          //如果經過以上判定,任務不為空,那么返回任務
                      if (r != null)
                          
          return r;
                      
          //如果取到任務為空,那么判定是否可以退出
                      if (workerCanExit()) {
                          
          //如果整個線程池狀態變為SHUTDOWN或者TERMINATED,那么將所有worker interrupt (如果正在執行,那繼續讓其執行)
                          if (runState >= SHUTDOWN) // Wake up others
                              interruptIdleWorkers();
                          
          return null;
                      }

                      
          // Else retry
                  }
           catch (InterruptedException ie) {
                      
          // On interruption, re-check runState
                  }

          }

              }


          //worker從workQueue中取不到數據的時候調用此方法,以決定自己是否跳出取任務的無限循環,從而結束此worker的運行
          private boolean workerCanExit() {
              
          final ReentrantLock mainLock = this.mainLock;
              mainLock.lock();
              
          boolean canExit;
              
          try {
                  
          /*
                  *線程池狀態為stop或者terminated,
                  *或者任務隊列里面任務已經為空,
                  *或者允許線程池線程空閑超時(實現方式是從工作隊列拿最多keepAliveTime的任務,超過這個時間就返回null了)并且
                   *當前線程池大于corePoolSize(>1)
                  *那么允許線程結束
                  *static final int RUNNING    = 0;
                  *static final int SHUTDOWN   = 1;
                  *static final int STOP       = 2;
                  *static final int TERMINATED = 3;
                  
          */

                  canExit 
          = runState >= STOP ||
                  workQueue.isEmpty() 
          ||
                 (allowCoreThreadTimeOut 
          &&
                  poolSize 
          > Math.max(1,corePoolSize));
              }
           finally {
                  mainLock.unlock();
              }

              
          return canExit;
          }

          當提交任務是,線程池都已滿,并且工作隊列也無空閑位置的情況下,ThreadPoolExecutor會執行reject操作,JDK提供了四種reject策略,包括AbortPolicy(直接拋RejectedException Exception),CallerRunsPolicy(提交任務線程自己執行,當然這時剩余任務也將無法提交),DiscardOldestPolicy(將線程池的workQueue任務隊列里面最老的任務剔除,將新任務丟入),DiscardPolicy(無視,忽略此任務,并且立即返回).實例化ThreadPoolExecutor,如果不指定任何飽和策略,默認將使用AbortPolicy.

          個人認為這些飽和策略并不十分理想
          ,特別是在應用既要保證快速,又要高可用的情況下,我的想法是能夠加入超時等待策略,也就是提交線程時線程池滿,能夠park住提交任務的線程,一旦有空閑,能在第一時間通知到等待線程. 這個實際上和主線程執行相似,但是主線程執行期間即使線程池有大量空閑也不會立即可以提交任務,效率上后者可能會比較低,特別是執行慢速任務.

          實例化Worker的時候會調用ThreadFactoryaddThread(Runnable r)方法返回一個Thread,這個線程工廠是可以在ThreadPoolExecutor實例化的時候指定的,如果不指定,那么將會使用DefaultThreadFactory, 這個也就是提供給使用者命名線程,線程歸組,是否是demon等線程相關屬性設置的機會.

          beforeExecuteafterExecute是提供給使用者擴展的,這兩個方法會在worker runTask之前和run完畢之后分別調用.JDK注釋里 Doug Lea(concurrent包作者)展示了beforeExecute一個很有趣的示例.代碼如下.

          class PausableThreadPoolExecutor extends ThreadPoolExecutor {
              
          private boolean isPaused;
              
          private ReentrantLock pauseLock = new ReentrantLock();
              
          private Condition unpaused = pauseLock.newCondition();
           
          public PausableThreadPoolExecutor(super(); }

          protected void beforeExecute(Thread t, Runnable r) {
              
          super.beforeExecute(t, r);
              pauseLock.lock();
              
          try {
                  
          while (isPaused) unpaused.await();
              }
           catch (InterruptedException ie) {
                  t.interrupt();
              }
           finally {
                  pauseLock.unlock();
              }

          }

           
          public void pause() {
              pauseLock.lock();
              
          try {
                  isPaused 
          = true;
              }
           finally {
                  pauseLock.unlock();
              }

          }


          public void resume() {
              pauseLock.lock();
              
          try {
                  isPaused 
          = false;
                  unpaused.signalAll();
              }
           finally {
                  pauseLock.unlock();
              }

          }

            }

          使用這個線程池,用戶可以隨時調用pause中止剩余任務執行,當然也可以使用resume重新開始執行剩余任務.

          ScheduledThreadPoolExecutor

          ScheduledThreadPoolExecutor
          是一個很實用的類,它的實現核心是基于DelayedWorkQueue.ScheduledThreadPoolExecutor的繼承結構上來看,各位應該能夠看出些端倪來,就是ScheduledThreadPoolExecutorThreadPoolExecutor中的任務隊列設置成了DelayedWorkQueue,這也就是說,線程池Worker從任務隊列中取的一個任務,需要等待這個隊列中最短超時任務的超時,也就是實現定時的效果.所以ScheduledThreadPoolExecutor所做的工作其實是比較少的.主要就是實現任務的實例化并加入工作隊列,以及支持scheduleAtFixedRatescheduleAtFixedDelay這種周期性任務執行.
          public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
                     
          super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,new DelayedWorkQueue(), threadFactory);
          }

          對于scheduleAfFixedRatescheduleAtFiexedDelay這種周期性任務支持,是由ScheduledThreadPoolExecutor內部封裝任務的ScheduledFutureTask來實現的.這個類在執行任務后,對于周期性任務,它會處理周期時間,并將自己再次丟入線程池的工作隊列,從而達到周期執行的目的.
          private void runPeriodic() {
                   
          boolean ok = ScheduledFutureTask.super.runAndReset();
                    
          boolean down = isShutdown();
                   
          // Reschedule if not cancelled and not shutdown or policy allows
                if (ok && (!down ||(getContinueExistingPeriodicTasksAfterShutdownPolicy() && !isStopped()))) {
                         
          long p = period;
                         
          if (p > 0)
                                time 
          += p;
                         
          else
                                time 
          = triggerTime(-p);
               
                          ScheduledThreadPoolExecutor.
          super.getQueue().add(this);
                   }

                  
          // This might have been the final executed delayed
                 
          // task.  Wake up threads to check.
                  else if (down)
                        interruptIdleWorkers();
          }

           

          2.       CompletionService


          ExecutorCompletionService

          CompletionService定義了線程池執行任務集,可以依次拿到任務執行完畢的Future,ExecutorCompletionService是其實現類,先舉個例子,如下代碼,這個例子中,需要注意ThreadPoolExecutor核心池一定保證能夠讓任務提交并且馬上執行,而不是放到等待隊列中去,那樣次序將會無法控制,CompletionService也將失去效果(其實核心池中的任務完成順序還是準確的).

          public static void main(String[] args) throws InterruptedException, ExecutionException{
              ThreadPoolExecutor es
          =new ThreadPoolExecutor(10152000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),new ThreadPoolExecutor.AbortPolicy());
              CompletionService
          <String> cs=new ExecutorCompletionService<String>(es);    
              cs.submit(
          new Callable<String>() {
               @Override
               
          public String call() throws Exception {
                   Thread.currentThread().sleep(
          1000);
                   
          return "i am sleeped 1000 milliseconds";
               }

              }
          );
                  
              cs.submit(
          new Callable<String>() {
               @Override
               
          public String call() throws Exception {
                   Thread.currentThread().sleep(
          5000);
                   
          return "i am sleeped 5000 milliseconds";
               }

              }
          );
                  
              cs.submit(
          new Callable<String>() {
               @Override
               
          public String call() throws Exception {
                   Thread.currentThread().sleep(
          4000);
                   
          return "i am sleeped 4000 milliseconds";
               }

              }
          );
                  
              cs.submit(
          new Callable<String>() {
               @Override
                   
          public String call() throws Exception {
                    Thread.currentThread().sleep(
          2000);
                        
          return "i am sleeped 2000 milliseconds";
                }

          }
          );
                  
              
          for(int i=0;i<4;i++){
                  Future
          <String> fu=cs.take();
                  System.out.println(fu.get());
              }

          }

            執行結果:
                  i am sleeped 1000 milliseconds 
             i am sleeped 2000 milliseconds
            
          i am sleeped 4000 milliseconds
            
          i am sleeped 5000 milliseconds
          從執行結果看來,我們發現先完成的任務先被拿出來了,直到所有任務被執行完畢,也就是CompletionService的效果達到了.

          ExecutorCompletionService并不復雜,關鍵的一個點就是它的內部類QueueingFuture繼承了FutureTask,并且實現了done()方法,done()方法是在線程池任務執行完畢,最后調用FutureTask的方法(這在 JAVA LOCK代碼淺析(http://www.aygfsteel.com/BucketLi/archive/2010/09/30/333471.html)一文中對于FutureTask代碼解析有提到)

          QueueingFuturedone()方法實現是將執行完的任務(FutureTask)丟入全局的完成隊列中(completionQueue),那么take是從這個blockingqueue中取元素.也就是任務完成就會有元素,即生產者消費者.

          這種實現的思想是將原本在單個FutureTask上的等待轉化為在BlockingQueue上的等待,即對全部FutureTask的等待,從而達到哪個先完成,哪個就可取執行結果的效果.

          private class QueueingFuture extends FutureTask<Void> {
              QueueingFuture(RunnableFuture
          <V> task) {
                  
          super(task, null);
                  his.task 
          = task;
              }

              
          protected void done() { completionQueue.add(task); }
              
          private final Future<V> task;
          }

          總結:
          JUC提供的線程池體系核心是在ThreadPoolExecutor, ScheduledThreadPoolExecutorExecutorCompletionService只是對其擴展,這里沒有去細講Executors這個便捷類,這個類提供很多便捷的線程池構建方法.各位使用的時候不妨去看下.



          posted on 2010-12-16 13:57 BucketLI 閱讀(5072) 評論(0)  編輯  收藏

          只有注冊用戶登錄后才能發表評論。


          網站導航:
           
          主站蜘蛛池模板: 海门市| 静安区| 贺州市| 耒阳市| 正宁县| 尼勒克县| 凉城县| 牡丹江市| 隆德县| 兴义市| 三穗县| 施甸县| 江津市| 宜兴市| 阿尔山市| 长乐市| 深泽县| 宁强县| 醴陵市| 垫江县| 油尖旺区| 南阳市| 柳江县| 伊宁市| 尤溪县| 汶川县| 阜平县| 根河市| 富民县| 伊宁市| 固镇县| 平乐县| 彭州市| 阳朔县| 嘉禾县| 仙居县| 玉环县| 博乐市| 潮安县| 芜湖县| 朝阳区|