我的Java路上那些事兒

          快樂成長(zhǎng)
          posts - 110, comments - 101, trackbacks - 0, articles - 7
            BlogJava :: 首頁(yè) :: 新隨筆 :: 聯(lián)系 :: 聚合  :: 管理

           原文:http://www.iteye.com/topic/1118660

          整個(gè)ThreadPoolExecutor的任務(wù)處理有4步操作:

           

          • 第一步,初始的poolSize < corePoolSize,提交的runnable任務(wù),會(huì)直接做為new一個(gè)Thread的參數(shù),立馬執(zhí)行
          • 第二步,當(dāng)提交的任務(wù)數(shù)超過了corePoolSize,就進(jìn)入了第二步操作。會(huì)將當(dāng)前的runable提交到一個(gè)block queue中
          • 第三步,如果block queue是個(gè)有界隊(duì)列,當(dāng)隊(duì)列滿了之后就進(jìn)入了第三步。如果poolSize < maximumPoolsize時(shí),會(huì)嘗試new 一個(gè)Thread的進(jìn)行救急處理,立馬執(zhí)行對(duì)應(yīng)的runnable任務(wù)
          • 第四步,如果第三步救急方案也無法處理了,就會(huì)走到第四步執(zhí)行reject操作。
          幾點(diǎn)說明:(相信這些網(wǎng)上一搜一大把,我這里簡(jiǎn)單介紹下,為后面做一下鋪墊)
          • block queue有以下幾種實(shí)現(xiàn):
            1. ArrayBlockingQueue :  有界的數(shù)組隊(duì)列
            2. LinkedBlockingQueue : 可支持有界/無界的隊(duì)列,使用鏈表實(shí)現(xiàn)
            3. PriorityBlockingQueue : 優(yōu)先隊(duì)列,可以針對(duì)任務(wù)排序
            4. SynchronousQueue : 隊(duì)列長(zhǎng)度為1的隊(duì)列,和Array有點(diǎn)區(qū)別就是:client thread提交到block queue會(huì)是一個(gè)阻塞過程,直到有一個(gè)worker thread連接上來poll task。
          • RejectExecutionHandler是針對(duì)任務(wù)無法處理時(shí)的一些自保護(hù)處理:
            1. Reject 直接拋出Reject exception
            2. Discard 直接忽略該runnable,不可取
            3. DiscardOldest 丟棄最早入隊(duì)列的的任務(wù)
            4. CallsRun 直接讓原先的client thread做為worker線程,進(jìn)行執(zhí)行

          容易被人忽略的點(diǎn):
          1.  pool threads啟動(dòng)后,以后的任務(wù)獲取都會(huì)通過block queue中,獲取堆積的runnable task.

          所以建議: block size >= corePoolSize ,不然線程池就沒任何意義
          2.  corePoolSize 和 maximumPoolSize的區(qū)別, 和大家正常理解的數(shù)據(jù)庫(kù)連接池不太一樣。
            *  據(jù)dbcp pool為例,會(huì)有minIdle , maxActive配置。minIdle代表是常駐內(nèi)存中的threads數(shù)量,maxActive代表是工作的最大線程數(shù)。
            *  這里的corePoolSize就是連接池的maxActive的概念,它沒有minIdle的概念(每個(gè)線程可以設(shè)置keepAliveTime,超過多少時(shí)間多有任務(wù)后銷毀線程,但不會(huì)固定保持一定數(shù)量的threads)。 
            * 這里的maximumPoolSize,是一種救急措施的第一層。當(dāng)threadPoolExecutor的工作threads存在滿負(fù)荷,并且block queue隊(duì)列也滿了,這時(shí)代表接近崩潰邊緣。這時(shí)允許臨時(shí)起一批threads,用來處理runnable,處理完后立馬退出。

          所以建議:  maximumPoolSize >= corePoolSize =期望的最大線程數(shù)。 (我曾經(jīng)配置了corePoolSize=1, maximumPoolSize=20, blockqueue為無界隊(duì)列,最后就成了單線程工作的pool。典型的配置錯(cuò)誤)

          3. 善用blockqueue和reject組合. 這里要重點(diǎn)推薦下CallsRun的Rejected Handler,從字面意思就是讓調(diào)用者自己來運(yùn)行。
          我們經(jīng)常會(huì)在線上使用一些線程池做異步處理,比如我前面做的(業(yè)務(wù)層)異步并行加載技術(shù)分析和設(shè)計(jì)將原本串行的請(qǐng)求都變?yōu)榱瞬⑿胁僮鳎^多的并行會(huì)增加系統(tǒng)的負(fù)載(比如軟中斷,上下文切換)。所以肯定需要對(duì)線程池做一個(gè)size限制。但是為了引入異步操作后,避免因在block queue的等待時(shí)間過長(zhǎng),所以需要在隊(duì)列滿的時(shí),執(zhí)行一個(gè)callsRun的策略,并行的操作又轉(zhuǎn)為一個(gè)串行處理,這樣就可以保證盡量少的延遲影響。

          所以建議:  RejectExecutionHandler = CallsRun ,  blockqueue size = 2 * poolSize (為啥是2倍poolSize,主要一個(gè)考慮就是瞬間高峰處理,允許一個(gè)thread等待一個(gè)runnable任務(wù))

          Btrace容量規(guī)劃

          再提供一個(gè)btrace腳本,分析線上的thread pool容量規(guī)劃是否合理,可以運(yùn)行時(shí)輸出poolSize等一些數(shù)據(jù)。

           

           

          Java代碼  
          1. import static com.sun.btrace.BTraceUtils.addToAggregation;   
          2. import static com.sun.btrace.BTraceUtils.field;   
          3. import static com.sun.btrace.BTraceUtils.get;   
          4. import static com.sun.btrace.BTraceUtils.newAggregation;   
          5. import static com.sun.btrace.BTraceUtils.newAggregationKey;   
          6. import static com.sun.btrace.BTraceUtils.printAggregation;   
          7. import static com.sun.btrace.BTraceUtils.println;   
          8. import static com.sun.btrace.BTraceUtils.str;   
          9. import static com.sun.btrace.BTraceUtils.strcat;   
          10.   
          11. import java.lang.reflect.Field;   
          12. import java.util.concurrent.atomic.AtomicInteger;   
          13.   
          14. import com.sun.btrace.BTraceUtils;   
          15. import com.sun.btrace.aggregation.Aggregation;   
          16. import com.sun.btrace.aggregation.AggregationFunction;   
          17. import com.sun.btrace.aggregation.AggregationKey;   
          18. import com.sun.btrace.annotations.BTrace;   
          19. import com.sun.btrace.annotations.Kind;   
          20. import com.sun.btrace.annotations.Location;   
          21. import com.sun.btrace.annotations.OnEvent;   
          22. import com.sun.btrace.annotations.OnMethod;   
          23. import com.sun.btrace.annotations.OnTimer;   
          24. import com.sun.btrace.annotations.Self;   
          25.   
          26. /**  
          27.  * 并行加載監(jiān)控  
          28.  *   
          29.  * @author jianghang 2011-4-7 下午10:59:53  
          30.  */  
          31. @BTrace  
          32. public class AsyncLoadTracer {   
          33.   
          34.     private static AtomicInteger rejecctCount = BTraceUtils.newAtomicInteger(0);   
          35.     private static Aggregation   histogram    = newAggregation(AggregationFunction.QUANTIZE);   
          36.     private static Aggregation   average      = newAggregation(AggregationFunction.AVERAGE);   
          37.     private static Aggregation   max          = newAggregation(AggregationFunction.MAXIMUM);   
          38.     private static Aggregation   min          = newAggregation(AggregationFunction.MINIMUM);   
          39.     private static Aggregation   sum          = newAggregation(AggregationFunction.SUM);   
          40.     private static Aggregation   count        = newAggregation(AggregationFunction.COUNT);   
          41.   
          42.     @OnMethod(clazz = "java.util.concurrent.ThreadPoolExecutor", method = "execute", location = @Location(value = Kind.ENTRY))   
          43.     public static void executeMonitor(@Self Object self) {   
          44.         Field poolSizeField = field("java.util.concurrent.ThreadPoolExecutor""poolSize");   
          45.         Field largestPoolSizeField = field("java.util.concurrent.ThreadPoolExecutor""largestPoolSize");   
          46.         Field workQueueField = field("java.util.concurrent.ThreadPoolExecutor""workQueue");   
          47.   
          48.         Field countField = field("java.util.concurrent.ArrayBlockingQueue""count");   
          49.         int poolSize = (Integer) get(poolSizeField, self);   
          50.         int largestPoolSize = (Integer) get(largestPoolSizeField, self);   
          51.         int queueSize = (Integer) get(countField, get(workQueueField, self));   
          52.   
          53.         println(strcat(strcat(strcat(strcat(strcat("poolSize : ", str(poolSize)), " largestPoolSize : "),   
          54.                                      str(largestPoolSize)), " queueSize : "), str(queueSize)));   
          55.     }   
          56.   
          57.     @OnMethod(clazz = "java.util.concurrent.ThreadPoolExecutor", method = "reject", location = @Location(value = Kind.ENTRY))   
          58.     public static void rejectMonitor(@Self Object self) {   
          59.         String name = str(self);   
          60.         if (BTraceUtils.startsWith(name, "com.alibaba.pivot.common.asyncload.impl.pool.AsyncLoadThreadPool")) {   
          61.             BTraceUtils.incrementAndGet(rejecctCount);   
          62.         }   
          63.     }   
          64.   
          65.     @OnTimer(1000)   
          66.     public static void rejectPrintln() {   
          67.         int reject = BTraceUtils.getAndSet(rejecctCount, 0);   
          68.         println(strcat("reject count in 1000 msec: ", str(reject)));   
          69.         AggregationKey key = newAggregationKey("rejectCount");   
          70.         addToAggregation(histogram, key, reject);   
          71.         addToAggregation(average, key, reject);   
          72.         addToAggregation(max, key, reject);   
          73.         addToAggregation(min, key, reject);   
          74.         addToAggregation(sum, key, reject);   
          75.         addToAggregation(count, key, reject);   
          76.     }   
          77.   
          78.     @OnEvent  
          79.     public static void onEvent() {   
          80.         BTraceUtils.truncateAggregation(histogram, 10);   
          81.         println("---------------------------------------------");   
          82.         printAggregation("Count", count);   
          83.         printAggregation("Min", min);   
          84.         printAggregation("Max", max);   
          85.         printAggregation("Average", average);   
          86.         printAggregation("Sum", sum);   
          87.         printAggregation("Histogram", histogram);   
          88.         println("---------------------------------------------");   
          89.     }   
          90. }  
           

          運(yùn)行結(jié)果:

           

          Java代碼  
          1. poolSize : 1 , largestPoolSize = 10 , queueSize = 10  
          2. reject count in 1000 msec: 0  

           

          說明:

          1. poolSize 代表為當(dāng)前的線程數(shù)

          2. largestPoolSize 代表為歷史最大的線程數(shù)

          3. queueSize 代表blockqueue的當(dāng)前堆積的size

          4. reject count 代表在1000ms內(nèi)的被reject的數(shù)量

           

           

          最后

            這是我對(duì)ThreadPoolExecutor使用過程中的一些經(jīng)驗(yàn)總結(jié),希望能對(duì)大家有所幫助,如有描述不對(duì)的地方歡迎拍磚。


          只有注冊(cè)用戶登錄后才能發(fā)表評(píng)論。


          網(wǎng)站導(dǎo)航:
           
          主站蜘蛛池模板: 许昌市| 静海县| 张北县| 罗山县| 杭锦旗| 昌宁县| 寻乌县| 金堂县| 木兰县| 仪征市| 自治县| 原平市| 五河县| 金乡县| 宣恩县| 邵东县| 福海县| 砚山县| 凌云县| 烟台市| 梅河口市| 长岛县| 无极县| 夹江县| 友谊县| 贡嘎县| 陇川县| 磐石市| 新源县| 嘉善县| 道真| 景德镇市| 南投县| 信丰县| 浏阳市| 梧州市| 威信县| 宁陕县| 新和县| 彰化市| 西盟|