I want to fly higher
          programming Explorer
          posts - 114,comments - 263,trackbacks - 0
          1.ThreadPoolExecutor#execute(Runnable command)

               public void execute(Runnable command) {
                         
          // 如果任務為空,則直接拋出空指針異常
                          if (command == null)
                              
          throw new NullPointerException();
                          
          // 1.如果線程池線程數目UnderCorePoolSize且RUNNING則直接添加worker線程并啟動
                          
          // 2.如果超過了corePoolSize或者addIfUnderCorePoolSize失敗則
                          if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
                          
          // 如果線程池是RUNNING狀態且可將任務command加入workQueue(即不違反容量限制)
                              if (runState == RUNNING && workQueue.offer(command)) {
                             
          // 因為是并發執行.如果此時發現線程池狀態不再是RUNNING(可能執行了類似shutdown的操作)或者線程池中已無Worker線程
                                  if (runState != RUNNING || poolSize == 0)
                            
          //1.如果線程池狀態不再是RUNNING且此時command依然在隊列中,即還未執行則直接拒絕.
                           
          // 2.否則如果線程池狀態 < STOP,即可能是SHUTDOWN狀態且任務隊列中依然有任務且工作線程的數目不足corePoolSize,則額外添加一個Worker線程并啟動
                                      ensureQueuedTaskHandled(command);
                              }

                              
          else if (!addIfUnderMaximumPoolSize(command))
                             
          // 如果在UnderMaximumPoolSize下增加worker線程失敗則執行拒絕策略,直接調用RejectedExecutionHandler#rejectedExecution
                                  reject(command); // is shutdown or saturated
                          }

                      }


          2. addIfUnderCorePoolSize(Runnable firstTask)

          // poolSize < corePoolSize && RUNNING的情況下添加worker線程并啟動worker線程
                  private boolean addIfUnderCorePoolSize(Runnable firstTask) {
                          Thread t = null;
                         
          final ReentrantLock mainLock = this.mainLock;
                              
          //
                          mainLock.lock();
                         
          try {
                             
          // 初始poolSize為0,runState為0,即RUNNING
                             
          // RUNNING = 0 / SHUTDOWN = 1 / STOP = 2
                             
          // TERMINATED = 3
                              if (poolSize < corePoolSize && runState == RUNNING)
                                  t = addThread(firstTask);
                          }
          finally {
                              mainLock.unlock();
                          }

                         
          if (t == null)
                             
          return false;
                          t.start();
                         
          return true;
                      }


          3.addThread(Runnable firstTask)

          private Thread addThread(Runnable firstTask) {
                         
          // 初始化Worker,傳入firstTask
                          Worker w = new Worker(firstTask);
                         
          // 利用線程工廠新建線程,注意這里傳入的參數是w
                          Thread t = threadFactory.newThread(w);
                         
          if (t != null) {
                              w.thread = t;
                         
          // 添加至workers
                              workers.add(w);
                         
          // ++poolSize
                              int nt = ++poolSize;
                             
          if (nt > largestPoolSize)
                                  largestPoolSize = nt;
                          }

                         
          return t;
                      }


          4.Worker

          private final class Worker implements Runnable {
                 
          /**
                   * The runLock is acquired and released surrounding each task
                   * execution. It mainly protects against interrupts that are
                   * intended to cancel the worker thread from instead
                   * interrupting the task being run.
                  
          */

                 
          private final ReentrantLock runLock = new ReentrantLock();

                 
          /**
                   * Initial task to run before entering run loop. Possibly null.
                  
          */

                 
          private Runnable firstTask;

                 
          /**
                   * Per thread completed task counter; accumulated
                   * into completedTaskCount upon termination.
                  
          */

                 
          volatile long completedTasks;

                 
          /**
                   * Thread this worker is running in.  Acts as a final field,
                   * but cannot be set until thread is created.
                  
          */

                  Thread thread;

                  Worker(Runnable firstTask) {
                     
          this.firstTask = firstTask;
                  }


                 
          boolean isActive() {
                     
          return runLock.isLocked();
                  }


                 
          /**
                   * 中斷線程如果沒有正在運行任務(可能在等待任務)
                    * {@link ThreadPoolExecutor#interruptIdleWorkers}
                    * {@link ThreadPoolExecutor#getTask}
                    * {@link ThreadPoolExecutor#shutdown}
                  
          */

                 
          void interruptIfIdle() {
                     
          final ReentrantLock runLock = this.runLock;
                     
          if (runLock.tryLock()) {
                         
          try {
                             
          // 注意只有該方法是被其他線程調用才會執行interrupt.
                             
          // 1.個人認為如果是當前自身線程執行到這里的時候,說明getTask返回了null.線程就會結束了.
                   
          // 2.Worker線程在自身任務的執行中調用此方法時沒有作用的.即恰恰說明了運行時不被中斷.(因為不太可能存在這樣的類似業務,內部線程自己在運行任務的時候中斷自己.沒有任何作用.你懂的.這壓根就是錯誤的做法)
                    
          // 3.還有一個很重要的原因是:這里加了運行鎖.即如果此時有任務正在運行則獨占runLock,則其他線程必須等待任務完畢釋放鎖才可以進行interrupt.
                              if (thread != Thread.currentThread())
                                  thread.interrupt();
                          }
          finally {
                              runLock.unlock();
                          }

                      }

                  }


                 
          /**
                   * Interrupts thread even if running a task.
                  
          */

                 
          void interruptNow() {
             
          // 直接進行中斷,無論是內部線程還是其他線程
             
          // 無論是否正在運行任務
             
          // 沒有獲得鎖
             
          // 此時如果線程正在等待任務或者任務執行過程中阻塞都可以被中斷
             
          // 個人認為該方法也肯定是由外部線程進行調用的,而非內部的線程,你懂的.用了也沒有作用.
                      thread.interrupt();
                  }


                 
          /**
                   * 運行任務在beforeExecute/afterExecute之間
                  
          */

                 
          private void runTask(Runnable task) {
                     
          final ReentrantLock runLock = this.runLock;
                      runLock.lock();
                     
          try {
                         
          /*
                           * Ensure that unless pool is stopping, this thread
                           * does not have its interrupt set. This requires a
                           * double-check of state in case the interrupt was
                           * cleared concurrently with a shutdownNow -- if so,
                           * the interrupt is re-enabled.
                          
          */

                     
          // 這段代碼乍看起來可能有些奇怪.個人認為是因為多線程的原因,如線程池調用了shutdownNow方法.
                     
          // 1.如果線程池是RUNNING/SHUTDOWN且之前被中斷過,則清除中斷狀態(interrupted)  2.再次檢查如果執行了shutdownNow的話,則會直接interrupt thread.而此時的中斷狀態可能被清除了.->需要需要再次調用interrupt重置中斷狀態.(還需要仔細考證)
                          if (runState < STOP &&
                              Thread.interrupted() &&
                              runState >= STOP)
                              thread.interrupt();

                         
          boolean ran = false;
                    
          // 任務執行前的一些業務,空實現,子類可覆蓋
                    
          // 任務完成或者任務執行出現異常則可通過afterExecute(空實現)追蹤
                          beforeExecute(thread, task);
                         
          try {
                              task.run();
                              ran = true;
                              afterExecute(task, null);
                        
          // 任務計數
                              ++completedTasks;
                          }
          catch (RuntimeException ex) {
                             
          if (!ran)
                                  afterExecute(task, ex);
                             
          throw ex;
                          }

                      }
          finally {
                          runLock.unlock();
                      }

                  }


                 
          /**
                   * Work線程主任務循環
                  
          */

                 
          public void run() {
                     
          try {
                          Runnable task = firstTask;
                          firstTask = null;
                    
          // 1.如果第一個任務不為null則一定會執行第一個任務
                    
          // 2.如果getTask為null則線程結束.
                          while (task != null || (task = getTask()) != null) {
                              runTask(task);
                              task = null;
                          }

                      }
          finally {
                   
          //  跳出while,線程即結束
                   
          // 1.completedTaskCount 計數
                   
          // 2.workers.remove(w) 從workers移除
                  
          // 3.--poolSize,如果poolSize為0則tryTerminate
                          workerDone(this);
                      }

                  }

              }


          5.Runnable getTask()

          Runnable getTask() {
                 
          for (;;) {
                     
          try {
                         
          int state = runState;
                    
          // 線程池運行狀態為STOP或者TERMINATED,直接返回null,則Worker線程跳出while,終止
                          if (state > SHUTDOWN)
                             
          return null;
                          Runnable r;
                    
          // 如果線程池運行狀態恰好是SHUTDOWN,則繼續從隊列獲取任務(隊列為空則返回null),也在該狀態下如果線程池不為空則一直獲取任務
                          if (state == SHUTDOWN)  // Help drain queue
                              r = workQueue.poll();
                         
          else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                        
          // RUNNING狀態下,poolSize超出了corePoolSize 或者allowCoreThreadTimeOut(允許核心線程超時) {@link ThreadPoolExecutor#allowCoreThreadTimeOut(boolean value)}
                        
          // 在keepAliveTime時間內等待可用的元素,等待時可被中斷.如果超時則返回null.
                              r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                         
          else
                       
          // Running狀態下,poolSize未超出corPoolSize且不允許核心線程超時,則在元素變得可用之前一直等待,可被中斷
                              r = workQueue.take();
                         
          if (r != null)
                             
          return r;
                    
          // 如果此時返回的任務為null且worker線程可退出(該方法其實是重復校驗,因為是并發執行.所以可能任務隊列已經有了任務等條件出現)
                          if (workerCanExit()) {
                        
          // 如果此時線程池狀態不是RUNNING
                              if (runState >= SHUTDOWN) // Wake up others
                         
          // 喚醒可能阻塞的任務,{@link Worker#interruptIfIdle}
                                  interruptIdleWorkers();
                       
          // 返回null,結束任務
                              return null;
                          }

                         
          // Else retry
                    
          // 繼續for-循環
                      }
          catch (InterruptedException ie) {
                         
          // On interruption, re-check runState
                      }

                  }

              }


          6.workerCanExit()

          // 判斷worker線程是否可退出
          private boolean workerCanExit() {
                 
          final ReentrantLock mainLock = this.mainLock;
                  mainLock.lock();
                 
          boolean canExit;
                 
          try {
                 
          // 運行狀態為STOP或者TERMINATED
                 
          // 或者任務隊列為空
                 
          // 或者池中至少有一個線程且允許核心線程超時
                      canExit = runState >= STOP ||
                          workQueue.isEmpty() ||
                          (allowCoreThreadTimeOut &&
                           poolSize > Math.max(1, corePoolSize));
                  }
          finally {
                      mainLock.unlock();
                  }

                 
          return canExit;
              }


          7.tryTerminate()

          // 嘗試終止
              private void tryTerminate() {
                
          // 如果當前池中沒有線程
                  if (poolSize == 0) {
                     
          int state = runState;
                 
          // 如果當前運行狀態時是Running/SHUTDOWN且任務隊列不為空
                      if (state < STOP && !workQueue.isEmpty()) {
                    
          // 重新設置為運行狀態
                          state = RUNNING; // disable termination check below
                   
          // 添加一個firstTask為null的worker并啟動.因為隊列不為空則可以getTask
                          Thread t = addThread(null);
                         
          if (t != null)
                              t.start();
                      }

            
                      
          // 如果運行狀態為STOP或者SHUTDOWN則置狀態為TERMINATED并喚醒等待終止的線程 {@link #awaitTermination(long timeout, TimeUnit unit)}
                      if (state == STOP || state == SHUTDOWN) {
                          runState = TERMINATED;
                          termination.signalAll();
                          terminated();// 此方法暫時未實現
                      }

                  }

              }


          8.awaitTermination(long timeout, TimeUnit unit)

          // 等待線程池終止 {@link #tryTerminate()}
                  public boolean awaitTermination(long timeout, TimeUnit unit)
                         
          throws InterruptedException {
                         
          long nanos = unit.toNanos(timeout);
                         
          final ReentrantLock mainLock = this.mainLock;
                          mainLock.lock();
                         
          try {
                         
          // 注這是一個無限循環,直到線程池終止或者超時
                              for (;;) {
                                 
          if (runState == TERMINATED)
                                     
          return true;
                                 
          if (nanos <= 0)
                                     
          return false;
                         
          //  {@link Condition#long awaitNanos(long nanosTimeout)}
                         
          //  此方法返回的是一個估算(nanosTimeout - awaitTime),如果小于等于0則表示沒有剩余時間,即超時.不過如果返回值是一個正值的話且線程池未終止的話->所以由將返回值繼續傳入了參數->確保肯定會發生超時而導致nanos<=0而跳出循環
                                  nanos = termination.awaitNanos(nanos);
                              }

                          }
          finally {
                              mainLock.unlock();
                          }

                      }


          9.shutdown()

          public void shutdown() {
                 
          // 檢查是否有shutdown的權限
                  SecurityManager security = System.getSecurityManager();
                 
          if (security != null)
                      security.checkPermission(shutdownPerm);

                 
          final ReentrantLock mainLock = this.mainLock;
                  mainLock.lock();
                 
          try {
                     
          if (security != null) { // Check if caller can modify our threads
                 
          // 檢查所有的worker線程是否有修改線程的權限
                          for (Worker w : workers)
                              security.checkAccess(w.thread);
                      }


                     
          int state = runState;
                 
          // 設置線程池當前狀態是RUNNING,則設置為SHUTDOWN狀態
                      if (state < SHUTDOWN)
                          runState = SHUTDOWN;

                     
          try {
                 
          // 嘗試打斷空閑的worker線程
                          for (Worker w : workers) {
                              w.interruptIfIdle();
                          }

                      }
          catch (SecurityException se) { // Try to back out
                 
          // 如果出現異常,則還原狀態
                          runState = state;
                         
          // tryTerminate() here would be a no-op 這個注釋的意思是出現了這個異常,tryTerminate是不起作用的.因為tryTerminate的條件是poolSize == 0.但是異常說明interruptIfIdle失敗則不可能poolSize == 0.
                 
          // 繼續向上拋出異常,這個異常是SecurityException
                          throw se;
                      }

                 
          // 嘗試終止(隊列為空的時候直接終止)
                      tryTerminate(); // Terminate now if pool and queue empty
                  }
          finally {
                      mainLock.unlock();
                  }

              }


          10.shutdownNow()

          public List<Runnable> shutdownNow() {
                  
          // 檢查shutdown權限以及修改工作線程的權限
                  SecurityManager security = System.getSecurityManager();
                  
          if (security != null)
                      security.checkPermission(shutdownPerm);

                  
          final ReentrantLock mainLock = this.mainLock;
                  mainLock.lock();
                  
          try {
                      
          if (security != null// Check if caller can modify our threads
                          for (Worker w : workers)
                              security.checkAccess(w.thread);
                      }


                      
          int state = runState;
              
          // 置狀態為STOP(可能未RUNNING或者SHUTDOWN)
                      if (state < STOP)
                          runState = STOP;

                      
          try {
                          
          for (Worker w : workers) {
                   
          // 直接中斷
                              w.interruptNow();
                          }

                      }
           catch (SecurityException se) // Try to back out
                          runState = state;
                          
          // tryTerminate() here would be a no-op
                          throw se;
                      }


              
          // 將隊列中的所有可用元素添加list中并返回
                      List<Runnable> tasks = drainQueue();
              
          // 嘗試終止
                      tryTerminate(); // Terminate now if pool and queue empty
                      return tasks;
                  }
           finally {
                      mainLock.unlock();
                  }

              }


          11.總結:
                1.corePoolSize/maximumPoolSize/keepAliveTime/workQueue/threadFactory/rejectedExecutionHandler 為線程池6大參數.
               2.corePoolSize:當線程池poolSize少于corePoolSize時,則會新增worker線程.
               3.線程池數目超過corePoolSize則向workQueue offer 任務.如果offer失敗則在maximumPoolSize下新增worker線程;如果超過了maximumPoolSize,則執行拒絕策略.
               4.keepAliveTime:poolSize超過了corePoolSize時(或者允許core thread timeout),此參數指明workQueue pool的超時時間,超時則返回null,即表示當前線程空閑.(workerCanExit中有判斷workQueue為空的條件)然后worker線程結束(被回收).
               5.Worker有兩個方法interruptIfIdle,這個方法會先獲得運行鎖,即如果當前有任務運行(占有鎖),則其他線程無法中斷.只有執行完workQueue的任務才會結束并釋放鎖.(shutdown);而另一個方法interruptNow則是不管任何條件,直接interrupt.
          posted on 2013-12-26 11:43 landon 閱讀(1671) 評論(2)  編輯  收藏 所屬分類: Sources

          FeedBack:
          # re: JDK源碼筆記1-ThreadPoolExecutor
          2013-12-26 12:28 | 零柒鎖業
          對國有銀行的辦事效率深表懷疑  回復  更多評論
            
          # re: JDK源碼筆記1-ThreadPoolExecutor
          2013-12-27 12:49 | 零柒鎖業
          支持博主分享  回復  更多評論
            
          主站蜘蛛池模板: 林口县| 庆阳市| 苏州市| 罗山县| 安平县| 昌宁县| 武夷山市| 兴业县| 繁昌县| 平江县| 玛沁县| 石柱| 邳州市| 锡林郭勒盟| 大同县| 政和县| 荥阳市| 普陀区| 吴桥县| 南投县| 绥德县| 水城县| 荆门市| 化德县| 涿州市| 元谋县| 北票市| 洛南县| 濉溪县| 双峰县| 右玉县| 景德镇市| 库车县| 闵行区| 玉林市| 东莞市| 桑日县| 册亨县| 三都| 梁河县| 大新县|