原文:http://www.iteye.com/topic/1118660
整個(gè)ThreadPoolExecutor的任務(wù)處理有4步操作:
- 第一步,初始的poolSize < corePoolSize,提交的runnable任務(wù),會(huì)直接做為new一個(gè)Thread的參數(shù),立馬執(zhí)行
- 第二步,當(dāng)提交的任務(wù)數(shù)超過了corePoolSize,就進(jìn)入了第二步操作。會(huì)將當(dāng)前的runable提交到一個(gè)block queue中
- 第三步,如果block queue是個(gè)有界隊(duì)列,當(dāng)隊(duì)列滿了之后就進(jìn)入了第三步。如果poolSize < maximumPoolsize時(shí),會(huì)嘗試new 一個(gè)Thread的進(jìn)行救急處理,立馬執(zhí)行對(duì)應(yīng)的runnable任務(wù)
- 第四步,如果第三步救急方案也無法處理了,就會(huì)走到第四步執(zhí)行reject操作。
幾點(diǎn)說明:(相信這些網(wǎng)上一搜一大把,我這里簡(jiǎn)單介紹下,為后面做一下鋪墊)
- block queue有以下幾種實(shí)現(xiàn):
1. ArrayBlockingQueue : 有界的數(shù)組隊(duì)列
2. LinkedBlockingQueue : 可支持有界/無界的隊(duì)列,使用鏈表實(shí)現(xiàn)
3. PriorityBlockingQueue : 優(yōu)先隊(duì)列,可以針對(duì)任務(wù)排序
4. SynchronousQueue : 隊(duì)列長(zhǎng)度為1的隊(duì)列,和Array有點(diǎn)區(qū)別就是:client thread提交到block queue會(huì)是一個(gè)阻塞過程,直到有一個(gè)worker thread連接上來poll task。 - RejectExecutionHandler是針對(duì)任務(wù)無法處理時(shí)的一些自保護(hù)處理:
1. Reject 直接拋出Reject exception
2. Discard 直接忽略該runnable,不可取
3. DiscardOldest 丟棄最早入隊(duì)列的的任務(wù)
4. CallsRun 直接讓原先的client thread做為worker線程,進(jìn)行執(zhí)行
容易被人忽略的點(diǎn):
1. pool threads啟動(dòng)后,以后的任務(wù)獲取都會(huì)通過block queue中,獲取堆積的runnable task.
所以建議: block size >= corePoolSize ,不然線程池就沒任何意義
2. corePoolSize 和 maximumPoolSize的區(qū)別, 和大家正常理解的數(shù)據(jù)庫(kù)連接池不太一樣。
* 據(jù)dbcp pool為例,會(huì)有minIdle , maxActive配置。minIdle代表是常駐內(nèi)存中的threads數(shù)量,maxActive代表是工作的最大線程數(shù)。
* 這里的corePoolSize就是連接池的maxActive的概念,它沒有minIdle的概念(每個(gè)線程可以設(shè)置keepAliveTime,超過多少時(shí)間多有任務(wù)后銷毀線程,但不會(huì)固定保持一定數(shù)量的threads)。
* 這里的maximumPoolSize,是一種救急措施的第一層。當(dāng)threadPoolExecutor的工作threads存在滿負(fù)荷,并且block queue隊(duì)列也滿了,這時(shí)代表接近崩潰邊緣。這時(shí)允許臨時(shí)起一批threads,用來處理runnable,處理完后立馬退出。
所以建議: maximumPoolSize >= corePoolSize =期望的最大線程數(shù)。 (我曾經(jīng)配置了corePoolSize=1, maximumPoolSize=20, blockqueue為無界隊(duì)列,最后就成了單線程工作的pool。典型的配置錯(cuò)誤)
3. 善用blockqueue和reject組合. 這里要重點(diǎn)推薦下CallsRun的Rejected Handler,從字面意思就是讓調(diào)用者自己來運(yùn)行。
我們經(jīng)常會(huì)在線上使用一些線程池做異步處理,比如我前面做的(業(yè)務(wù)層)異步并行加載技術(shù)分析和設(shè)計(jì), 將原本串行的請(qǐng)求都變?yōu)榱瞬⑿胁僮鳎^多的并行會(huì)增加系統(tǒng)的負(fù)載(比如軟中斷,上下文切換)。所以肯定需要對(duì)線程池做一個(gè)size限制。但是為了引入異步操作后,避免因在block queue的等待時(shí)間過長(zhǎng),所以需要在隊(duì)列滿的時(shí),執(zhí)行一個(gè)callsRun的策略,并行的操作又轉(zhuǎn)為一個(gè)串行處理,這樣就可以保證盡量少的延遲影響。
所以建議: RejectExecutionHandler = CallsRun , blockqueue size = 2 * poolSize (為啥是2倍poolSize,主要一個(gè)考慮就是瞬間高峰處理,允許一個(gè)thread等待一個(gè)runnable任務(wù))
Btrace容量規(guī)劃
再提供一個(gè)btrace腳本,分析線上的thread pool容量規(guī)劃是否合理,可以運(yùn)行時(shí)輸出poolSize等一些數(shù)據(jù)。
- import static com.sun.btrace.BTraceUtils.addToAggregation;
- import static com.sun.btrace.BTraceUtils.field;
- import static com.sun.btrace.BTraceUtils.get;
- import static com.sun.btrace.BTraceUtils.newAggregation;
- import static com.sun.btrace.BTraceUtils.newAggregationKey;
- import static com.sun.btrace.BTraceUtils.printAggregation;
- import static com.sun.btrace.BTraceUtils.println;
- import static com.sun.btrace.BTraceUtils.str;
- import static com.sun.btrace.BTraceUtils.strcat;
- import java.lang.reflect.Field;
- import java.util.concurrent.atomic.AtomicInteger;
- import com.sun.btrace.BTraceUtils;
- import com.sun.btrace.aggregation.Aggregation;
- import com.sun.btrace.aggregation.AggregationFunction;
- import com.sun.btrace.aggregation.AggregationKey;
- import com.sun.btrace.annotations.BTrace;
- import com.sun.btrace.annotations.Kind;
- import com.sun.btrace.annotations.Location;
- import com.sun.btrace.annotations.OnEvent;
- import com.sun.btrace.annotations.OnMethod;
- import com.sun.btrace.annotations.OnTimer;
- import com.sun.btrace.annotations.Self;
- /**
- * 并行加載監(jiān)控
- *
- * @author jianghang 2011-4-7 下午10:59:53
- */
- @BTrace
- public class AsyncLoadTracer {
- private static AtomicInteger rejecctCount = BTraceUtils.newAtomicInteger(0);
- private static Aggregation histogram = newAggregation(AggregationFunction.QUANTIZE);
- private static Aggregation average = newAggregation(AggregationFunction.AVERAGE);
- private static Aggregation max = newAggregation(AggregationFunction.MAXIMUM);
- private static Aggregation min = newAggregation(AggregationFunction.MINIMUM);
- private static Aggregation sum = newAggregation(AggregationFunction.SUM);
- private static Aggregation count = newAggregation(AggregationFunction.COUNT);
- @OnMethod(clazz = "java.util.concurrent.ThreadPoolExecutor", method = "execute", location = @Location(value = Kind.ENTRY))
- public static void executeMonitor(@Self Object self) {
- Field poolSizeField = field("java.util.concurrent.ThreadPoolExecutor", "poolSize");
- Field largestPoolSizeField = field("java.util.concurrent.ThreadPoolExecutor", "largestPoolSize");
- Field workQueueField = field("java.util.concurrent.ThreadPoolExecutor", "workQueue");
- Field countField = field("java.util.concurrent.ArrayBlockingQueue", "count");
- int poolSize = (Integer) get(poolSizeField, self);
- int largestPoolSize = (Integer) get(largestPoolSizeField, self);
- int queueSize = (Integer) get(countField, get(workQueueField, self));
- println(strcat(strcat(strcat(strcat(strcat("poolSize : ", str(poolSize)), " largestPoolSize : "),
- str(largestPoolSize)), " queueSize : "), str(queueSize)));
- }
- @OnMethod(clazz = "java.util.concurrent.ThreadPoolExecutor", method = "reject", location = @Location(value = Kind.ENTRY))
- public static void rejectMonitor(@Self Object self) {
- String name = str(self);
- if (BTraceUtils.startsWith(name, "com.alibaba.pivot.common.asyncload.impl.pool.AsyncLoadThreadPool")) {
- BTraceUtils.incrementAndGet(rejecctCount);
- }
- }
- @OnTimer(1000)
- public static void rejectPrintln() {
- int reject = BTraceUtils.getAndSet(rejecctCount, 0);
- println(strcat("reject count in 1000 msec: ", str(reject)));
- AggregationKey key = newAggregationKey("rejectCount");
- addToAggregation(histogram, key, reject);
- addToAggregation(average, key, reject);
- addToAggregation(max, key, reject);
- addToAggregation(min, key, reject);
- addToAggregation(sum, key, reject);
- addToAggregation(count, key, reject);
- }
- @OnEvent
- public static void onEvent() {
- BTraceUtils.truncateAggregation(histogram, 10);
- println("---------------------------------------------");
- printAggregation("Count", count);
- printAggregation("Min", min);
- printAggregation("Max", max);
- printAggregation("Average", average);
- printAggregation("Sum", sum);
- printAggregation("Histogram", histogram);
- println("---------------------------------------------");
- }
- }
運(yùn)行結(jié)果:
- poolSize : 1 , largestPoolSize = 10 , queueSize = 10
- reject count in 1000 msec: 0
說明:
1. poolSize 代表為當(dāng)前的線程數(shù)
2. largestPoolSize 代表為歷史最大的線程數(shù)
3. queueSize 代表blockqueue的當(dāng)前堆積的size
4. reject count 代表在1000ms內(nèi)的被reject的數(shù)量
最后
這是我對(duì)ThreadPoolExecutor使用過程中的一些經(jīng)驗(yàn)總結(jié),希望能對(duì)大家有所幫助,如有描述不對(duì)的地方歡迎拍磚。