java 線程池


          圖左邊是線程池的類結(jié)構(gòu),右邊是放入線程池執(zhí)行的任務(wù)類結(jié)構(gòu)

          ExecutorService
          ExecutorService定義了線程池的基本的方法,AbstractExecutorService是個抽象類,實現(xiàn)了ExecutorService的部分,主要是實現(xiàn)了幾個submit()方法,并且提供方法將提交進(jìn)來的任務(wù)
          封裝成FutureTask,ThreadPoolExecutor則實現(xiàn)線程池的管理

          FutureTask
          FutureTask內(nèi)部引用了一個AQS的實現(xiàn),當(dāng)FutureTask沒有執(zhí)行完畢的時候,F(xiàn)utureTask的get()方法所在線程阻塞在AQS阻塞隊列上,所以get()方法可以認(rèn)為是一個異步變同步的過程。

          AbstractExecutorService的submit(Runnable)方法:

              
          public Future<?> submit(Runnable task) {
                  
          if (task == nullthrow new NullPointerException();
                  RunnableFuture
          <Object> ftask = newTaskFor(task, null);
                  execute(ftask);
                  
          return ftask;
              }
          調(diào)用的是ThreadPoolExecutor類實現(xiàn)的execute(Runnable)方法:
              public void execute(Runnable command) {
                  
          if (command == null)
                      
          throw new NullPointerException();
                  
          if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {//關(guān)鍵點1
                      
          if (runState == RUNNING && workQueue.offer(command)) {//關(guān)鍵點2
                          
          if (runState != RUNNING || poolSize == 0)//關(guān)鍵點3
                              ensureQueuedTaskHandled(command);
                      }
                      
          else if (!addIfUnderMaximumPoolSize(command))關(guān)鍵點4
                          reject(command); 
          // is shutdown or saturated
                  }
              }
          ThreadPoolExecutor里面有幾個size:poolSize 當(dāng)前線程池的大小,corePoolSize 核心線程池大小,maximumPoolSize最大線程池大小。根據(jù)這幾個size線程池執(zhí)行任務(wù)的方式也會不同。
          關(guān)鍵點1.當(dāng)poolSize大于coreSize的時候執(zhí)行關(guān)鍵點2,也就是進(jìn)入等待執(zhí)行的隊列;否則執(zhí)行addIfUnderMaximumPoolSize()方法,該方法創(chuàng)建一個線程,這個線程被加入到工作線程集當(dāng)中,并且第一個要執(zhí)行的就是當(dāng)前這個任務(wù)。
          關(guān)鍵點2.poolSize小于coreSize時將任務(wù)加入到等待隊列,如果成功執(zhí)行關(guān)鍵點3,如果ThreadPoolExecutor的狀態(tài)不是runnable或者等待隊列加入失敗的話,執(zhí)行關(guān)鍵點4
          關(guān)鍵點3.此時可能ThreadPoolExecutor的狀態(tài)不是runnable或者poolSize為0,可能是執(zhí)行shutDown()引起的,這個需要特殊處理
          關(guān)鍵點4.此時poolSize大于coreSize,然后等待隊列滿了,需要直接創(chuàng)建一個線程執(zhí)行任務(wù),并把線程放入到工作線程集中。如果前面執(zhí)行不成功,那么就需要執(zhí)行相應(yīng)的飽和策略了
          總結(jié):線程池包含三部分:coreSize大小的核心線程池,等待隊列,以及擴展線程池,核心線程池和擴展線程池其實是在同一個工作線程集當(dāng)中

          工作線程
          工作線程集中每一個對象用內(nèi)部類Worker表示,實現(xiàn)了Runnable接口,里面引用了線程:
              private final class Worker implements Runnable {
                  
          /**
                   * The runLock is acquired and released surrounding each task
                   * execution. It mainly protects against interrupts that are
                   * intended to cancel the worker thread from instead
                   * interrupting the task being run.
                   
          */
                  
          private final ReentrantLock runLock = new ReentrantLock();

                  
          /**
                   * Initial task to run before entering run loop. Possibly null.
                   
          */
                  
          private Runnable firstTask;

                  
          /**
                   * Per thread completed task counter; accumulated
                   * into completedTaskCount upon termination.
                   
          */
                  
          volatile long completedTasks;

                  
          /**
                   * Thread this worker is running in.  Acts as a final field,
                   * but cannot be set until thread is created.
                   
          */
                  Thread thread;

                  Worker(Runnable firstTask) {
                      
          //第一個任務(wù)
                      this.firstTask = firstTask;
                  }

                  
          boolean isActive() {
                      
          return runLock.isLocked();
                  }

                  
          /**
                   * Interrupts thread if not running a task.
                   
          */
                  
          void interruptIfIdle() {
                      
          final ReentrantLock runLock = this.runLock;
                      
          if (runLock.tryLock()) {
                          
          try {
                      
          if (thread != Thread.currentThread())
                      thread.interrupt();
                          } 
          finally {
                              runLock.unlock();
                          }
                      }
                  }

                  
          /**
                   * Interrupts thread even if running a task.
                   
          */
                  
          void interruptNow() {
                      thread.interrupt();
                  }

                  
          /**
                   * Runs a single task between before/after methods.
                   
          */
                  
          private void runTask(Runnable task) {
                      
          final ReentrantLock runLock = this.runLock;
                      runLock.lock();
                      
          try {
                          
          /*
                           * Ensure that unless pool is stopping, this thread
                           * does not have its interrupt set. This requires a
                           * double-check of state in case the interrupt was
                           * cleared concurrently with a shutdownNow -- if so,
                           * the interrupt is re-enabled.
                           
          */
                          
                          
          //雙重檢測,對shutdown()執(zhí)行后而且執(zhí)行了interruptIfIdle()方法(在getTask()中)這段代碼會用到
                          if (runState < STOP &&
                              Thread.interrupted() 
          &&
                              runState 
          >= STOP)
                              thread.interrupt();
                          
          /*
                           * Track execution state to ensure that afterExecute
                           * is called only if task completed or threw
                           * exception. Otherwise, the caught runtime exception
                           * will have been thrown by afterExecute itself, in
                           * which case we don't want to call it again.
                           
          */
                          
          boolean ran = false;
                          
          //執(zhí)行前的hook
                          beforeExecute(thread, task);
                          
          try {
                              
          //FutureTask的run()方法
                              task.run();
                              ran 
          = true;
                          
          //執(zhí)行后的hook
                              afterExecute(task, null);
                              
          ++completedTasks;
                          } 
          catch (RuntimeException ex) {
                              
          if (!ran)
                                  afterExecute(task, ex);
                              
          throw ex;
                          }
                      } 
          finally {
                          runLock.unlock();
                      }
                  }

                  
          /**
                   * Main run loop  
                   * 由于本runnable在創(chuàng)建線程時是線程構(gòu)造函數(shù)的參數(shù),
                   * 所以線程運行最終會運行下面的run方法。
                   * 而且屬性thread的值就是創(chuàng)建的線程
                   
          */
                  
          public void run() {
                      
          try {
                          Runnable task 
          = firstTask;
                          firstTask 
          = null;
                          
          //循環(huán)中運行runTask()方法,firstTask不為空,就先運行firstTask
                          
          //之后運行g(shù)etTask()方法,該方法就是從等待隊列里面獲取任務(wù)或者阻塞等待任務(wù)。
                          
          //由于得到的task可能為空,所以while循環(huán)可能跳出
                          while (task != null || (task = getTask()) != null) {
                              
          //執(zhí)行任務(wù)
                              runTask(task);
                              task 
          = null;
                          }
                      } 
          finally {
                          workerDone(
          this);
                      }
                  }
              }

          工作線程執(zhí)行第一個任務(wù),然后加入到線程池,不斷執(zhí)行上面的run()方法,基本就是:等待隊列取任務(wù)-->執(zhí)行-->取任務(wù)-->執(zhí)行的循環(huán),中間需要處理外部執(zhí)行shutdown()后的流程,
          需要動態(tài)的增大減小工作線程池,核心線程池,擴展線程池。getTask()方法里面有做這些:
                Runnable getTask() {
                  
          for (;;) {
                      
          try {
                          
          int state = runState;
                          
          //線程池狀態(tài)為STOP或者TERMINATED的話,就不執(zhí)行隊列里面的任務(wù)了
                          
                          
          if (state > SHUTDOWN)
                              
          return null;
                          Runnable r;
                          
          if (state == SHUTDOWN)  // Help drain queue
                          
          //在SHUTDOWN的話還是需要執(zhí)行任務(wù)的
                              r = workQueue.poll();
                          
          //如果poolSize>corePoolSize 說明線程池大于核心線程池,那么隊列
                          
          //可能不會有任務(wù),allowCoreThreadTimeOut為true說明核心線程池
                          
          //線程timeout以后可以被回收,如果是上面二個條件之一的話,使用poll()方法
                          
          //可能超時后返回的就是空的任務(wù)
                          else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                              r 
          = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                          
          else
                          
          //上面兩條不符合,那么隊列有任務(wù)的概率大,這個會阻塞知道有任務(wù)(反正核心線程池不會被回收的)
                              r = workQueue.take();
                          
          if (r != null)
                              
          return r;

                           
          //上面可能返回為空,所以能運行到這里
                           
          //運行workerCanExit()方法,如果返回true,那么檢測如果
                           
          //線程池狀態(tài)為STOP或者TERMINATED的話,就開始中斷空閑的線程
                          if (workerCanExit()) {
                              
          if (runState >= SHUTDOWN) // Wake up others
                                  interruptIdleWorkers();
                              
          return null;
                          }
                          
          // Else retry
                      } catch (InterruptedException ie) {
                          
          // On interruption, re-check runState
                      }
                  }
              }
              
          private boolean workerCanExit() {
                  
          final ReentrantLock mainLock = this.mainLock;
                  mainLock.lock();
                  
          boolean canExit;
                  
          try {
                      
          //線程池狀態(tài)為STOP或者TERMINATED或者等待隊列為空或者核心線程可以回收加上工作
                      
          //線程池大于核心線程池
                      canExit = runState >= STOP ||
                          workQueue.isEmpty() 
          ||
                          (allowCoreThreadTimeOut 
          &&
                           poolSize 
          > Math.max(1, corePoolSize));
                  } 
          finally {
                      mainLock.unlock();
                  }
                  
          return canExit;
              }
          getTask()由于各種調(diào)節(jié)功能使得返回的任務(wù)可能為空,run()方法的workerDone()就有機會執(zhí)行:
            void workerDone(Worker w) {

                  
          final ReentrantLock mainLock = this.mainLock;
                  mainLock.lock();
                  
          try {
                      
          //更新總的完成次數(shù)
                      completedTaskCount += w.completedTasks;
                      
          //執(zhí)行到這里說明核心線程池可以回收而且返回了的任務(wù)為空
                      
          //認(rèn)為核心線程池太大,進(jìn)行回收
                      workers.remove(w);
                      
          if (--poolSize == 0)
                          tryTerminate();
                  } 
          finally {
                      mainLock.unlock();
                  }
              }

          在Worker的runTask()方法中最主要還是執(zhí)行FutureTask的run方法:
              public void run() {
                  sync.innerRun();
              }



                  
          private static final int RUNNING   = 1;
                  
          /** State value representing that task ran */
                  
          private static final int RAN       = 2;
                  
          /** State value representing that task was cancelled */
                  
          private static final int CANCELLED = 4;
                    
          void innerRun() {
                       
          //初始狀態(tài)變了,直接返回
                      if (!compareAndSetState(0, RUNNING))
                          
          return;
                      
          try {
                          runner 
          = Thread.currentThread();
                          
          //如果不是RUNNING,那么肯定是RAN或者CANCELLED
                          
          //這兩個狀態(tài)都可以釋放鎖了
                          if (getState() == RUNNING) // recheck after setting thread

                            
          //設(shè)置執(zhí)行的結(jié)果
                              innerSet(callable.call());
                          
          else

                               
                               
          //這里會執(zhí)行到FutureTask內(nèi)部的實現(xiàn)的tryReleaseShared方法

                              releaseShared(
          0); // cancel

                      } 
          catch (Throwable ex) {
                          innerSetException(ex);
                      }
                  }


                  
          void innerSet(V v) {
                  
          //循環(huán)只到下面一項產(chǎn)生
                  for (;;) {
                  
          int s = getState();
                  
          //另外的線程已經(jīng)執(zhí)行了 直接返回
                  if (s == RAN)
                      
          return;
                  
          //任務(wù)取消了,釋放鎖
                  if (s == CANCELLED) {
                      
          // aggressively release to set runner to null,
                      
          // in case we are racing with a cancel request
                      
          // that will try to interrupt runner
                              releaseShared(0);
                              
          return;
                          }
                   
          //設(shè)置完成標(biāo)志,然后設(shè)置result,釋放鎖,喚醒阻塞在FutureTask.get()上面的線程
                  if (compareAndSetState(s, RAN)) {
                              result 
          = v;
                              releaseShared(
          0);
                              
          //hook
                              done();
                      
          return;
                          }
                      }
                  }


                 
          protected boolean tryReleaseShared(int ignore) {
                     
          //線程清空
                      runner = null;
                      
          return true;
                  }

          說到底FutureTask還是用AQS的阻塞實現(xiàn)的。

          拒絕策略
          當(dāng)maximumPoolSize表示的最大線程池滿了以后,加入的任務(wù)會被拒絕,JDK有四種拒絕的策略:
          AbortPolicy:拋出異常
          CallerRunsPolicy:當(dāng)前的線程直接的執(zhí)行任務(wù)
          DiscardOldestPolicy:舍去最老的任務(wù),然后線程池執(zhí)行最新的任務(wù)
          DiscardPolicy:直接拒絕任務(wù)

          總結(jié):
          線程池設(shè)計的初衷還是減少了每個任務(wù)調(diào)用的開銷,可以在執(zhí)行大量異步任務(wù)時提高性能,并且還可以管理資源
          提高性能方面主要是去除了大部分任務(wù)調(diào)用時線程的創(chuàng)建和銷毀的開銷
          資源管理方面:有核心線程池,等待隊列,擴展線程池等方面,對于不同的方面有不同的策略,并且存在一定的
          動態(tài)調(diào)節(jié)線程池的能力,對線程池超負(fù)荷時也有一些拒絕策略



          posted on 2011-09-20 22:27 nod0620 閱讀(722) 評論(0)  編輯  收藏 所屬分類: 多線程java

          <2025年6月>
          25262728293031
          1234567
          891011121314
          15161718192021
          22232425262728
          293012345

          導(dǎo)航

          統(tǒng)計

          常用鏈接

          留言簿

          隨筆分類

          隨筆檔案

          文章分類

          文章檔案

          搜索

          最新評論

          閱讀排行榜

          評論排行榜

          主站蜘蛛池模板: 德江县| 南雄市| 澳门| 西乌珠穆沁旗| 新河县| 阜康市| 孝义市| 台安县| 英山县| 西昌市| 泗洪县| 丹棱县| 读书| 南昌市| 南江县| 和田县| 博客| 鹿泉市| 嵩明县| 四子王旗| 新平| 正定县| 都匀市| 新乡市| 沈丘县| 铁岭县| 兴宁市| 集贤县| 渭南市| 阿合奇县| 宿迁市| 资阳市| 聊城市| 兰州市| 长春市| 玉山县| 洪江市| 大港区| 岫岩| 浙江省| 双鸭山市|