the journey is the reward...

          常用鏈接

          統計

          最新評論

          Java線程池的瑕疵,For java util concurrent threadpool Since jdk1.5

              java.util.concurrent的作者是Doug Lea : 世界上對Java影響力最大的個人,在jdk1.5之前大家一定熟悉他的backport-util-concurrent.jar."這個鼻梁掛著眼鏡,留著德王威廉二世的胡子,臉上永遠掛著謙遜靦腆笑容,服務于紐約州立大學Oswego分校計算器科學系的老大爺。",他可是并發編程的大師級人物哦!
              Since jdk1.5,在java.util.concurrent包下的線程池模型是基于queue的,threadpool只有一個,而queue卻有多個LinkedBlockingQueue,SynchronousQueue,ScheduledThreadPoolExecutor.DelayedWorkQueue等可參見java.util.concurrent.Executors.注意:我下面的問題是針對LinkedBlockingQueue的,參考的src為jdk1.6.
              Threadpool通過以下的3個屬性來標志池中的線程數:
          corePoolSize(類似minimumPoolSize),poolSize(當前池中的線程數),maximumPoolSize(最大的線程數).
          這3個屬性表達的意思是每次新創建或結束一個線程poolSize++/--,在最忙的情況下threadpool創建的線程數不能超過maximumPoolSize,
          當空閑的情況下poolSize應該降到corePoolSize,當然threadpool如果從創建時它就從來沒有處理過一次請求的話,那么poolSize當然為0.
              通過以上2段的說明下面我要引出我所要講的問題:
          我們來看一下java.util.concurrent.ThreadPoolExecutor的execute方法:
          public void execute(Runnable command) {
                  if (command == null)
                      throw new NullPointerException();
                  if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
                      if (runState == RUNNING && workQueue.offer(command)) {
                          if (runState != RUNNING || poolSize == 0)
                              ensureQueuedTaskHandled(command);
                      }
                      else if (!addIfUnderMaximumPoolSize(command))
                          reject(command); // is shutdown or saturated
                  }
          }
          它表達的主體意思是:如果當前的poolSize<corePoolSize,那么就增加線程直到poolSize==corePoolSize.
          如果poolSize已經到達corePoolSize,那么就把command(task) put to workQueue,如果workQueue為LinkedBlockingQueue的話,
          那么只有當workQueue offer commands達到workQueue.capacity后,threadpool才會繼續增加線程直到maximumPoolSize.
          1.*****如果LinkedBlockingQueue.capacity被設置為Integer.MAX_VALUE,那么池中的線程幾乎不可能到達maximumPoolSize.*****
          所以你如果使用了Executors.newFixedThreadPool的話,那么maximumPoolSize和corePoolSize是一樣的并且LinkedBlockingQueue.capacity==Integer.MAX_VALUE,或者如果這樣new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,timeUnit,new LinkedBlockingQueue<Runnable>(/*Integer.MAX_VALUE*/))的話,
          上述的使用都將導致maximumPoolSize是無效的,也就是說線程池中的線程數不會超出corePoolSize.
          這個也讓那些tomcat6的開發人員可能也郁悶了,他們不得不改寫LinkedBlockingQueue,以tomcat-6.0.20-src為例:
          org.apache.tomcat.util.net.NioEndpoint.TaskQueue extends LinkedBlockingQueue<Runnable> override offer method: 
           public void setParent(ThreadPoolExecutor tp, NioEndpoint ep) {
                      parent = tp;
                      this.endpoint = ep;
                  }
                 
                  public boolean offer(Runnable o) {
                      //we can't do any checks
                      if (parent==null) return super.offer(o);
                      //we are maxed out on threads, simply queue the object
                      if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
                      //we have idle threads, just add it to the queue
                      //this is an approximation, so it could use some tuning
                      if (endpoint.activeSocketProcessors.get()<(parent.getPoolSize())) return super.offer(o);
                      //if we have less threads than maximum force creation of a new thread
                      if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
                      //if we reached here, we need to add it to the queue
                      return super.offer(o);
                  } 

          org.apache.tomcat.util.net.NioEndpoint.start()-->
             TaskQueue taskqueue = new TaskQueue();/***queue.capacity==Integer.MAX_VALUE***/
                               TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-");
                               executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60,TimeUnit.SECONDS,taskqueue, tf);
                               taskqueue.setParent( (ThreadPoolExecutor) executor, this);
          2.*****如果把LinkedBlockingQueue.capacity設置為一個適當的值遠小于Integer.MAX_VALUE,那么只有put到queue的任務數到達LinkedBlockingQueue的capacity后,才會繼續增加池中的線程,使得poolSize超出corePoolSize但不超過maximumPoolSize,這個時候來增加線程數是不是有點晚了呢??????*****.
          這樣一來reject(command)也可能隨之而來了,LinkedBlockingQueue.capacity設置為何值又是個頭疼的問題.
          所以ThreadPoolExecutor+LinkedBlockingQueue表達的意思是首先會增加線程數到corePoolSize,但只有queue的任務容量到達最大capacity后,才會繼續在corePoolSize的基數上增加線程來處理任務,直到maximumPoolSize.
              但為什么我們不能這樣呢:將LinkedBlockingQueue.capacity設置為Integer.MAX_VALUE,讓task盡可能的得到處理,同時在忙的情況下,增加池中的線程充到maximumPoolSize來盡快的處理這些任務.即便是把LinkedBlockingQueue.capacity設置為一個適當的值<<<遠小于Integer.MAX_VALUE,也不一定非得在任務數到達LinkedBlockingQueue的capacity之后才去增加線程使poolSize超出corePoolSize趨向maximumPoolSize.
              所以java util concurrent中的ThreadPoolExecutor+LinkedBlockingQueue組合的缺點也就出來了:如果我們想讓線程池盡可能多的處理大量的任務的話,我們會把LinkedBlockingQueue.capacity設置為Integer.MAX_VALUE,但是如果這樣的話池中的線程數量就不能充到最大maximumPoolSize,也就不能充分發揮線程池的最大處理能力.如果我們把LinkedBlockingQueue.capacity設置為一個較小的值,那么線程池中的線程數量會充到最大maximumPoolSize,但是如果池中的線程都忙的話,線程池又會reject請求的任務,因為隊列已滿.
              如果我們把LinkedBlockingQueue.capacity設置為一個較大的值但不是Integer.MAX_VALUE,那么等到線程池的線程數量準備開始超出corePoolSize時,也就是任務隊列滿了,這個時候才去增加線程的話,請求任務的執行會有一定的延時,也就是沒有得到及時的處理.
              其實也就是說ThreadPoolExecutor缺乏靈敏的線程調度機制,沒有根據當前任務的執行情況,是忙,還是閑,以及隊列中的待處理任務的數量級進行動態的調配線程數,使得它的處理效率受到影響.
          那么什么是忙的情況的判斷呢? 
          busy[1]:如果poolSize==corePoolSize,并且現在忙著執行任務的線程數(currentBusyWorkers)等于poolSize.[而不管現在put到queue的任務數是否到達queue.capacity]
          busy[2].1:如果poolSize==corePoolSize,并且put到queue的任務數已到達queue.capacity.[queue.capacity是針對有任務隊列極限限制的情況]
          busy[2].2:線程池的基本目標是盡可能的快速處理大量的請求任務,那么就不一定非得在put到queue的任務數到達queue的capacity之后才判斷為忙的情況,只要queue中現有的任務數(task_counter)與poolSize或者maximumPoolSize存在一定的比例時就可以判斷為忙情,比如task_counter>=poolSize或者maximumPoolSize的(NumberOfProcessor+1)倍,這樣queue.capacity這個限制可以取消了.
          在上述busy[1],busy[2]這2種情況下都應增加線程數,直至maximumPoolSize,使請求的任務得到最快的處理.

          前面講的是忙的時候ThreadPoolExecutor+LinkedBlockingQueue在處理上的瑕疵,那么空閑的時候又要如何呢?
          如果corePoolSize<poolSize<maximumPoolSize,那么線程等待keepAliveTime之后應該降為corePoolSize,嘿嘿,這個就真的成了bug了哦,一個很難發現的bug,poolSize是被降下來了,可是很可能降過了頭<corePoolSize,甚至降為0也有可能.
          ThreadPoolExecutor.Worker.run()-->ThreadPoolExecutor.getTask():
          Runnable getTask() {
                  for (;;) {
                      try {
                          int state = runState;
                          if (state > SHUTDOWN)
                              return null;
                          Runnable r;
                          if (state == SHUTDOWN)  // Help drain queue
                              r = workQueue.poll();
                          else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                /*queue is empty,這里timeout之后,return null,之后call workerCanExit() return true.*/
                              r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                          else
                              r = workQueue.take();
                          if (r != null)
                              return r;
                          if (workerCanExit()) {
                              if (runState >= SHUTDOWN) // Wake up others
                                  interruptIdleWorkers();
                              return null;
                          }
                          // Else retry
                      } catch (InterruptedException ie) {
                          // On interruption, re-check runState
                      }
                  }
          }//end getTask.
          private boolean workerCanExit() {
                  final ReentrantLock mainLock = this.mainLock;
                  mainLock.lock();
                  boolean canExit;
                  try {
                      canExit = runState >= STOP ||
                          workQueue.isEmpty() ||
                          (allowCoreThreadTimeOut &&
                           poolSize > Math.max(1, corePoolSize));
                  } finally {
                      mainLock.unlock();
                  }
                  return canExit;
          }//end workerCanExit.

          在workerCanExit() return true之后,poolSize仍然大于corePoolSize,pooSize的值沒有變化,
          ThreadPoolExecutor.Worker.run()將結束-->ThreadPoolExecutor.Worker.workerDone-->這個時候才將poolSize--,可惜晚了,在多線程的環境下,poolSize的值將變為小于corePoolSize,而不是等于corePoolSize!!!!!!
          例如:如果poolSize(6)大于corePoolSize(5),那么同時timeout的就不一定是一條線程,而是多條,它們都有可能退出run,使得poolSize--減過了corePoolSize.
              提一下java.util.concurrent.ThreadPoolExecutor的allowCoreThreadTimeOut方法, @since 1.6 public void allowCoreThreadTimeOut(boolean value);
          它表達的意思是在空閑的時候讓線程等待keepAliveTime,timeout后使得poolSize能夠降為0.[其實我是希望它降為minimumPoolSize,特別是在服務器的環境下,我們需要線程池保持一定數量的線程來及時處理"零零碎碎的,斷斷續續的,一股一波的,不是很有壓力的"請求],當然你可以把corePoolSize當作minimumPoolSize,而不調用該方法.
              針對上述java util concurrent線程池的瑕疵,我對java util concurrent線程池模型進行了修正,特別是在"忙"(busy[1],busy[2])的情況下的任務處理進行了優化,使得線程池盡可能快的處理盡可能多的任務.
          下面提供了高效的線程池的源碼購買:
          java版threadpool:
          http://item.taobao.com/auction/item_detail-0db2-9078a9045826f273dcea80aa490f1a8b.jhtml
          c [not c++]版threadpool in windows NT:
          http://item.taobao.com/auction/item_detail-0db2-28e37cb6776a1bc526ef5a27aa411e71.jhtml

          posted on 2010-02-20 20:15 adapterofcoms 閱讀(1732) 評論(1)  編輯  收藏 所屬分類: java techs

          評論

          # re: Java線程池的瑕疵,For java util concurrent threadpool Since jdk1.5 2010-10-19 19:30 myfavorite

          您好,請教線程池啟動后如何監控線程池狀態?  回復  更多評論   

          主站蜘蛛池模板: 泸水县| 铜陵市| 石嘴山市| 云梦县| 阳高县| 通渭县| 洪湖市| 循化| 融水| 桦南县| 洛扎县| 松江区| 栾川县| 湄潭县| 玛沁县| 积石山| 出国| 连山| 环江| 平塘县| 资阳市| 万州区| 道孚县| 河南省| 黎川县| 沁阳市| 顺平县| 务川| 灌阳县| 宜兰市| 惠州市| 苏尼特左旗| 洪泽县| 香格里拉县| 上虞市| 马山县| 广河县| 历史| 正蓝旗| 洪洞县| 炎陵县|