posts - 110, comments - 101, trackbacks - 0, articles - 7
            BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理

           原文:http://www.iteye.com/topic/1118660

          整個ThreadPoolExecutor的任務處理有4步操作:

           

          • 第一步,初始的poolSize < corePoolSize,提交的runnable任務,會直接做為new一個Thread的參數,立馬執行
          • 第二步,當提交的任務數超過了corePoolSize,就進入了第二步操作。會將當前的runable提交到一個block queue中
          • 第三步,如果block queue是個有界隊列,當隊列滿了之后就進入了第三步。如果poolSize < maximumPoolsize時,會嘗試new 一個Thread的進行救急處理,立馬執行對應的runnable任務
          • 第四步,如果第三步救急方案也無法處理了,就會走到第四步執行reject操作。
          幾點說明:(相信這些網上一搜一大把,我這里簡單介紹下,為后面做一下鋪墊)
          • block queue有以下幾種實現:
            1. ArrayBlockingQueue :  有界的數組隊列
            2. LinkedBlockingQueue : 可支持有界/無界的隊列,使用鏈表實現
            3. PriorityBlockingQueue : 優先隊列,可以針對任務排序
            4. SynchronousQueue : 隊列長度為1的隊列,和Array有點區別就是:client thread提交到block queue會是一個阻塞過程,直到有一個worker thread連接上來poll task。
          • RejectExecutionHandler是針對任務無法處理時的一些自保護處理:
            1. Reject 直接拋出Reject exception
            2. Discard 直接忽略該runnable,不可取
            3. DiscardOldest 丟棄最早入隊列的的任務
            4. CallsRun 直接讓原先的client thread做為worker線程,進行執行

          容易被人忽略的點:
          1.  pool threads啟動后,以后的任務獲取都會通過block queue中,獲取堆積的runnable task.

          所以建議: block size >= corePoolSize ,不然線程池就沒任何意義
          2.  corePoolSize 和 maximumPoolSize的區別, 和大家正常理解的數據庫連接池不太一樣。
            *  據dbcp pool為例,會有minIdle , maxActive配置。minIdle代表是常駐內存中的threads數量,maxActive代表是工作的最大線程數。
            *  這里的corePoolSize就是連接池的maxActive的概念,它沒有minIdle的概念(每個線程可以設置keepAliveTime,超過多少時間多有任務后銷毀線程,但不會固定保持一定數量的threads)。 
            * 這里的maximumPoolSize,是一種救急措施的第一層。當threadPoolExecutor的工作threads存在滿負荷,并且block queue隊列也滿了,這時代表接近崩潰邊緣。這時允許臨時起一批threads,用來處理runnable,處理完后立馬退出。

          所以建議:  maximumPoolSize >= corePoolSize =期望的最大線程數。 (我曾經配置了corePoolSize=1, maximumPoolSize=20, blockqueue為無界隊列,最后就成了單線程工作的pool。典型的配置錯誤)

          3. 善用blockqueue和reject組合. 這里要重點推薦下CallsRun的Rejected Handler,從字面意思就是讓調用者自己來運行。
          我們經常會在線上使用一些線程池做異步處理,比如我前面做的(業務層)異步并行加載技術分析和設計將原本串行的請求都變為了并行操作,但過多的并行會增加系統的負載(比如軟中斷,上下文切換)。所以肯定需要對線程池做一個size限制。但是為了引入異步操作后,避免因在block queue的等待時間過長,所以需要在隊列滿的時,執行一個callsRun的策略,并行的操作又轉為一個串行處理,這樣就可以保證盡量少的延遲影響。

          所以建議:  RejectExecutionHandler = CallsRun ,  blockqueue size = 2 * poolSize (為啥是2倍poolSize,主要一個考慮就是瞬間高峰處理,允許一個thread等待一個runnable任務)

          Btrace容量規劃

          再提供一個btrace腳本,分析線上的thread pool容量規劃是否合理,可以運行時輸出poolSize等一些數據。

           

           

          Java代碼  
          1. import static com.sun.btrace.BTraceUtils.addToAggregation;   
          2. import static com.sun.btrace.BTraceUtils.field;   
          3. import static com.sun.btrace.BTraceUtils.get;   
          4. import static com.sun.btrace.BTraceUtils.newAggregation;   
          5. import static com.sun.btrace.BTraceUtils.newAggregationKey;   
          6. import static com.sun.btrace.BTraceUtils.printAggregation;   
          7. import static com.sun.btrace.BTraceUtils.println;   
          8. import static com.sun.btrace.BTraceUtils.str;   
          9. import static com.sun.btrace.BTraceUtils.strcat;   
          10.   
          11. import java.lang.reflect.Field;   
          12. import java.util.concurrent.atomic.AtomicInteger;   
          13.   
          14. import com.sun.btrace.BTraceUtils;   
          15. import com.sun.btrace.aggregation.Aggregation;   
          16. import com.sun.btrace.aggregation.AggregationFunction;   
          17. import com.sun.btrace.aggregation.AggregationKey;   
          18. import com.sun.btrace.annotations.BTrace;   
          19. import com.sun.btrace.annotations.Kind;   
          20. import com.sun.btrace.annotations.Location;   
          21. import com.sun.btrace.annotations.OnEvent;   
          22. import com.sun.btrace.annotations.OnMethod;   
          23. import com.sun.btrace.annotations.OnTimer;   
          24. import com.sun.btrace.annotations.Self;   
          25.   
          26. /**  
          27.  * 并行加載監控  
          28.  *   
          29.  * @author jianghang 2011-4-7 下午10:59:53  
          30.  */  
          31. @BTrace  
          32. public class AsyncLoadTracer {   
          33.   
          34.     private static AtomicInteger rejecctCount = BTraceUtils.newAtomicInteger(0);   
          35.     private static Aggregation   histogram    = newAggregation(AggregationFunction.QUANTIZE);   
          36.     private static Aggregation   average      = newAggregation(AggregationFunction.AVERAGE);   
          37.     private static Aggregation   max          = newAggregation(AggregationFunction.MAXIMUM);   
          38.     private static Aggregation   min          = newAggregation(AggregationFunction.MINIMUM);   
          39.     private static Aggregation   sum          = newAggregation(AggregationFunction.SUM);   
          40.     private static Aggregation   count        = newAggregation(AggregationFunction.COUNT);   
          41.   
          42.     @OnMethod(clazz = "java.util.concurrent.ThreadPoolExecutor", method = "execute", location = @Location(value = Kind.ENTRY))   
          43.     public static void executeMonitor(@Self Object self) {   
          44.         Field poolSizeField = field("java.util.concurrent.ThreadPoolExecutor""poolSize");   
          45.         Field largestPoolSizeField = field("java.util.concurrent.ThreadPoolExecutor""largestPoolSize");   
          46.         Field workQueueField = field("java.util.concurrent.ThreadPoolExecutor""workQueue");   
          47.   
          48.         Field countField = field("java.util.concurrent.ArrayBlockingQueue""count");   
          49.         int poolSize = (Integer) get(poolSizeField, self);   
          50.         int largestPoolSize = (Integer) get(largestPoolSizeField, self);   
          51.         int queueSize = (Integer) get(countField, get(workQueueField, self));   
          52.   
          53.         println(strcat(strcat(strcat(strcat(strcat("poolSize : ", str(poolSize)), " largestPoolSize : "),   
          54.                                      str(largestPoolSize)), " queueSize : "), str(queueSize)));   
          55.     }   
          56.   
          57.     @OnMethod(clazz = "java.util.concurrent.ThreadPoolExecutor", method = "reject", location = @Location(value = Kind.ENTRY))   
          58.     public static void rejectMonitor(@Self Object self) {   
          59.         String name = str(self);   
          60.         if (BTraceUtils.startsWith(name, "com.alibaba.pivot.common.asyncload.impl.pool.AsyncLoadThreadPool")) {   
          61.             BTraceUtils.incrementAndGet(rejecctCount);   
          62.         }   
          63.     }   
          64.   
          65.     @OnTimer(1000)   
          66.     public static void rejectPrintln() {   
          67.         int reject = BTraceUtils.getAndSet(rejecctCount, 0);   
          68.         println(strcat("reject count in 1000 msec: ", str(reject)));   
          69.         AggregationKey key = newAggregationKey("rejectCount");   
          70.         addToAggregation(histogram, key, reject);   
          71.         addToAggregation(average, key, reject);   
          72.         addToAggregation(max, key, reject);   
          73.         addToAggregation(min, key, reject);   
          74.         addToAggregation(sum, key, reject);   
          75.         addToAggregation(count, key, reject);   
          76.     }   
          77.   
          78.     @OnEvent  
          79.     public static void onEvent() {   
          80.         BTraceUtils.truncateAggregation(histogram, 10);   
          81.         println("---------------------------------------------");   
          82.         printAggregation("Count", count);   
          83.         printAggregation("Min", min);   
          84.         printAggregation("Max", max);   
          85.         printAggregation("Average", average);   
          86.         printAggregation("Sum", sum);   
          87.         printAggregation("Histogram", histogram);   
          88.         println("---------------------------------------------");   
          89.     }   
          90. }  
           

          運行結果:

           

          Java代碼  
          1. poolSize : 1 , largestPoolSize = 10 , queueSize = 10  
          2. reject count in 1000 msec: 0  

           

          說明:

          1. poolSize 代表為當前的線程數

          2. largestPoolSize 代表為歷史最大的線程數

          3. queueSize 代表blockqueue的當前堆積的size

          4. reject count 代表在1000ms內的被reject的數量

           

           

          最后

            這是我對ThreadPoolExecutor使用過程中的一些經驗總結,希望能對大家有所幫助,如有描述不對的地方歡迎拍磚。


          只有注冊用戶登錄后才能發表評論。


          網站導航:
           
          主站蜘蛛池模板: 宁陕县| 突泉县| 永清县| 壤塘县| 乌拉特后旗| 漯河市| 安溪县| 嘉义市| 雷山县| 福州市| 彝良县| 讷河市| 麟游县| 阿荣旗| 绥中县| 越西县| 如东县| 休宁县| 当阳市| 县级市| 宁陕县| 象州县| 北海市| 孟州市| 富顺县| 仁化县| 图们市| 响水县| 娱乐| 普格县| 兰溪市| 秦安县| 金川县| 梨树县| 曲水县| 大庆市| 中牟县| 马公市| 敦化市| 上栗县| 南澳县|