the journey is the reward...

          常用鏈接

          統(tǒng)計

          最新評論

          Java線程池的瑕疵,For java util concurrent threadpool Since jdk1.5

              java.util.concurrent的作者是Doug Lea : 世界上對Java影響力最大的個人,在jdk1.5之前大家一定熟悉他的backport-util-concurrent.jar."這個鼻梁掛著眼鏡,留著德王威廉二世的胡子,臉上永遠(yuǎn)掛著謙遜靦腆笑容,服務(wù)于紐約州立大學(xué)Oswego分校計算器科學(xué)系的老大爺。",他可是并發(fā)編程的大師級人物哦!
              Since jdk1.5,在java.util.concurrent包下的線程池模型是基于queue的,threadpool只有一個,而queue卻有多個LinkedBlockingQueue,SynchronousQueue,ScheduledThreadPoolExecutor.DelayedWorkQueue等可參見java.util.concurrent.Executors.注意:我下面的問題是針對LinkedBlockingQueue的,參考的src為jdk1.6.
              Threadpool通過以下的3個屬性來標(biāo)志池中的線程數(shù):
          corePoolSize(類似minimumPoolSize),poolSize(當(dāng)前池中的線程數(shù)),maximumPoolSize(最大的線程數(shù)).
          這3個屬性表達的意思是每次新創(chuàng)建或結(jié)束一個線程poolSize++/--,在最忙的情況下threadpool創(chuàng)建的線程數(shù)不能超過maximumPoolSize,
          當(dāng)空閑的情況下poolSize應(yīng)該降到corePoolSize,當(dāng)然threadpool如果從創(chuàng)建時它就從來沒有處理過一次請求的話,那么poolSize當(dāng)然為0.
              通過以上2段的說明下面我要引出我所要講的問題:
          我們來看一下java.util.concurrent.ThreadPoolExecutor的execute方法:
          public void execute(Runnable command) {
                  if (command == null)
                      throw new NullPointerException();
                  if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
                      if (runState == RUNNING && workQueue.offer(command)) {
                          if (runState != RUNNING || poolSize == 0)
                              ensureQueuedTaskHandled(command);
                      }
                      else if (!addIfUnderMaximumPoolSize(command))
                          reject(command); // is shutdown or saturated
                  }
          }
          它表達的主體意思是:如果當(dāng)前的poolSize<corePoolSize,那么就增加線程直到poolSize==corePoolSize.
          如果poolSize已經(jīng)到達corePoolSize,那么就把command(task) put to workQueue,如果workQueue為LinkedBlockingQueue的話,
          那么只有當(dāng)workQueue offer commands達到workQueue.capacity后,threadpool才會繼續(xù)增加線程直到maximumPoolSize.
          1.*****如果LinkedBlockingQueue.capacity被設(shè)置為Integer.MAX_VALUE,那么池中的線程幾乎不可能到達maximumPoolSize.*****
          所以你如果使用了Executors.newFixedThreadPool的話,那么maximumPoolSize和corePoolSize是一樣的并且LinkedBlockingQueue.capacity==Integer.MAX_VALUE,或者如果這樣new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,timeUnit,new LinkedBlockingQueue<Runnable>(/*Integer.MAX_VALUE*/))的話,
          上述的使用都將導(dǎo)致maximumPoolSize是無效的,也就是說線程池中的線程數(shù)不會超出corePoolSize.
          這個也讓那些tomcat6的開發(fā)人員可能也郁悶了,他們不得不改寫LinkedBlockingQueue,以tomcat-6.0.20-src為例:
          org.apache.tomcat.util.net.NioEndpoint.TaskQueue extends LinkedBlockingQueue<Runnable> override offer method: 
           public void setParent(ThreadPoolExecutor tp, NioEndpoint ep) {
                      parent = tp;
                      this.endpoint = ep;
                  }
                 
                  public boolean offer(Runnable o) {
                      //we can't do any checks
                      if (parent==null) return super.offer(o);
                      //we are maxed out on threads, simply queue the object
                      if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
                      //we have idle threads, just add it to the queue
                      //this is an approximation, so it could use some tuning
                      if (endpoint.activeSocketProcessors.get()<(parent.getPoolSize())) return super.offer(o);
                      //if we have less threads than maximum force creation of a new thread
                      if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
                      //if we reached here, we need to add it to the queue
                      return super.offer(o);
                  } 

          org.apache.tomcat.util.net.NioEndpoint.start()-->
             TaskQueue taskqueue = new TaskQueue();/***queue.capacity==Integer.MAX_VALUE***/
                               TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-");
                               executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60,TimeUnit.SECONDS,taskqueue, tf);
                               taskqueue.setParent( (ThreadPoolExecutor) executor, this);
          2.*****如果把LinkedBlockingQueue.capacity設(shè)置為一個適當(dāng)?shù)闹颠h(yuǎn)小于Integer.MAX_VALUE,那么只有put到queue的任務(wù)數(shù)到達LinkedBlockingQueue的capacity后,才會繼續(xù)增加池中的線程,使得poolSize超出corePoolSize但不超過maximumPoolSize,這個時候來增加線程數(shù)是不是有點晚了呢??????*****.
          這樣一來reject(command)也可能隨之而來了,LinkedBlockingQueue.capacity設(shè)置為何值又是個頭疼的問題.
          所以ThreadPoolExecutor+LinkedBlockingQueue表達的意思是首先會增加線程數(shù)到corePoolSize,但只有queue的任務(wù)容量到達最大capacity后,才會繼續(xù)在corePoolSize的基數(shù)上增加線程來處理任務(wù),直到maximumPoolSize.
              但為什么我們不能這樣呢:將LinkedBlockingQueue.capacity設(shè)置為Integer.MAX_VALUE,讓task盡可能的得到處理,同時在忙的情況下,增加池中的線程充到maximumPoolSize來盡快的處理這些任務(wù).即便是把LinkedBlockingQueue.capacity設(shè)置為一個適當(dāng)?shù)闹?lt;<<遠(yuǎn)小于Integer.MAX_VALUE,也不一定非得在任務(wù)數(shù)到達LinkedBlockingQueue的capacity之后才去增加線程使poolSize超出corePoolSize趨向maximumPoolSize.
              所以java util concurrent中的ThreadPoolExecutor+LinkedBlockingQueue組合的缺點也就出來了:如果我們想讓線程池盡可能多的處理大量的任務(wù)的話,我們會把LinkedBlockingQueue.capacity設(shè)置為Integer.MAX_VALUE,但是如果這樣的話池中的線程數(shù)量就不能充到最大maximumPoolSize,也就不能充分發(fā)揮線程池的最大處理能力.如果我們把LinkedBlockingQueue.capacity設(shè)置為一個較小的值,那么線程池中的線程數(shù)量會充到最大maximumPoolSize,但是如果池中的線程都忙的話,線程池又會reject請求的任務(wù),因為隊列已滿.
              如果我們把LinkedBlockingQueue.capacity設(shè)置為一個較大的值但不是Integer.MAX_VALUE,那么等到線程池的線程數(shù)量準(zhǔn)備開始超出corePoolSize時,也就是任務(wù)隊列滿了,這個時候才去增加線程的話,請求任務(wù)的執(zhí)行會有一定的延時,也就是沒有得到及時的處理.
              其實也就是說ThreadPoolExecutor缺乏靈敏的線程調(diào)度機制,沒有根據(jù)當(dāng)前任務(wù)的執(zhí)行情況,是忙,還是閑,以及隊列中的待處理任務(wù)的數(shù)量級進行動態(tài)的調(diào)配線程數(shù),使得它的處理效率受到影響.
          那么什么是忙的情況的判斷呢? 
          busy[1]:如果poolSize==corePoolSize,并且現(xiàn)在忙著執(zhí)行任務(wù)的線程數(shù)(currentBusyWorkers)等于poolSize.[而不管現(xiàn)在put到queue的任務(wù)數(shù)是否到達queue.capacity]
          busy[2].1:如果poolSize==corePoolSize,并且put到queue的任務(wù)數(shù)已到達queue.capacity.[queue.capacity是針對有任務(wù)隊列極限限制的情況]
          busy[2].2:線程池的基本目標(biāo)是盡可能的快速處理大量的請求任務(wù),那么就不一定非得在put到queue的任務(wù)數(shù)到達queue的capacity之后才判斷為忙的情況,只要queue中現(xiàn)有的任務(wù)數(shù)(task_counter)與poolSize或者maximumPoolSize存在一定的比例時就可以判斷為忙情,比如task_counter>=poolSize或者maximumPoolSize的(NumberOfProcessor+1)倍,這樣queue.capacity這個限制可以取消了.
          在上述busy[1],busy[2]這2種情況下都應(yīng)增加線程數(shù),直至maximumPoolSize,使請求的任務(wù)得到最快的處理.

          前面講的是忙的時候ThreadPoolExecutor+LinkedBlockingQueue在處理上的瑕疵,那么空閑的時候又要如何呢?
          如果corePoolSize<poolSize<maximumPoolSize,那么線程等待keepAliveTime之后應(yīng)該降為corePoolSize,嘿嘿,這個就真的成了bug了哦,一個很難發(fā)現(xiàn)的bug,poolSize是被降下來了,可是很可能降過了頭<corePoolSize,甚至降為0也有可能.
          ThreadPoolExecutor.Worker.run()-->ThreadPoolExecutor.getTask():
          Runnable getTask() {
                  for (;;) {
                      try {
                          int state = runState;
                          if (state > SHUTDOWN)
                              return null;
                          Runnable r;
                          if (state == SHUTDOWN)  // Help drain queue
                              r = workQueue.poll();
                          else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                /*queue is empty,這里timeout之后,return null,之后call workerCanExit() return true.*/
                              r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                          else
                              r = workQueue.take();
                          if (r != null)
                              return r;
                          if (workerCanExit()) {
                              if (runState >= SHUTDOWN) // Wake up others
                                  interruptIdleWorkers();
                              return null;
                          }
                          // Else retry
                      } catch (InterruptedException ie) {
                          // On interruption, re-check runState
                      }
                  }
          }//end getTask.
          private boolean workerCanExit() {
                  final ReentrantLock mainLock = this.mainLock;
                  mainLock.lock();
                  boolean canExit;
                  try {
                      canExit = runState >= STOP ||
                          workQueue.isEmpty() ||
                          (allowCoreThreadTimeOut &&
                           poolSize > Math.max(1, corePoolSize));
                  } finally {
                      mainLock.unlock();
                  }
                  return canExit;
          }//end workerCanExit.

          在workerCanExit() return true之后,poolSize仍然大于corePoolSize,pooSize的值沒有變化,
          ThreadPoolExecutor.Worker.run()將結(jié)束-->ThreadPoolExecutor.Worker.workerDone-->這個時候才將poolSize--,可惜晚了,在多線程的環(huán)境下,poolSize的值將變?yōu)樾∮赾orePoolSize,而不是等于corePoolSize!!!!!!
          例如:如果poolSize(6)大于corePoolSize(5),那么同時timeout的就不一定是一條線程,而是多條,它們都有可能退出run,使得poolSize--減過了corePoolSize.
              提一下java.util.concurrent.ThreadPoolExecutor的allowCoreThreadTimeOut方法, @since 1.6 public void allowCoreThreadTimeOut(boolean value);
          它表達的意思是在空閑的時候讓線程等待keepAliveTime,timeout后使得poolSize能夠降為0.[其實我是希望它降為minimumPoolSize,特別是在服務(wù)器的環(huán)境下,我們需要線程池保持一定數(shù)量的線程來及時處理"零零碎碎的,斷斷續(xù)續(xù)的,一股一波的,不是很有壓力的"請求],當(dāng)然你可以把corePoolSize當(dāng)作minimumPoolSize,而不調(diào)用該方法.
              針對上述java util concurrent線程池的瑕疵,我對java util concurrent線程池模型進行了修正,特別是在"忙"(busy[1],busy[2])的情況下的任務(wù)處理進行了優(yōu)化,使得線程池盡可能快的處理盡可能多的任務(wù).
          下面提供了高效的線程池的源碼購買:
          java版threadpool:
          http://item.taobao.com/auction/item_detail-0db2-9078a9045826f273dcea80aa490f1a8b.jhtml
          c [not c++]版threadpool in windows NT:
          http://item.taobao.com/auction/item_detail-0db2-28e37cb6776a1bc526ef5a27aa411e71.jhtml

          posted on 2010-02-20 20:15 adapterofcoms 閱讀(1732) 評論(1)  編輯  收藏 所屬分類: java techs

          評論

          # re: Java線程池的瑕疵,For java util concurrent threadpool Since jdk1.5 2010-10-19 19:30 myfavorite

          您好,請教線程池啟動后如何監(jiān)控線程池狀態(tài)?  回復(fù)  更多評論   

          主站蜘蛛池模板: 富裕县| 都兰县| 和林格尔县| 宜城市| 盘锦市| 汕尾市| 台中县| 罗定市| 莱州市| 贵定县| 阿勒泰市| 长治县| 闻喜县| 东至县| 广汉市| 兴安县| 乳源| 荆州市| 五台县| 延庆县| 南漳县| 元谋县| 蒙自县| 色达县| 建始县| 东乡县| 新平| 蓝山县| 长白| 岳池县| 金湖县| 镇宁| 高密市| 电白县| 肃宁县| 陆川县| 新绛县| 潜山县| 汶上县| 琼海市| 西平县|