[本文PDF地址:http://www.aygfsteel.com/Files/xylz/Inside.Java.Concurrency_31.ThreadPool.part4_RejectedPolicy.pdf]
上一節(jié)中提到關(guān)閉線程池過程中需要對(duì)新提交的任務(wù)進(jìn)行處理。這個(gè)是java.util.concurrent.RejectedExecutionHandler處理的邏輯。
在沒有分析線程池原理之前先來分析下為什么有任務(wù)拒絕的情況發(fā)生。
這里先假設(shè)一個(gè)前提:線程池有一個(gè)任務(wù)隊(duì)列,用于緩存所有待處理的任務(wù),正在處理的任務(wù)將從任務(wù)隊(duì)列中移除。因此在任務(wù)隊(duì)列長(zhǎng)度有限的情況下就會(huì)出現(xiàn)新任務(wù)的拒絕處理問題,需要有一種策略來處理應(yīng)該加入任務(wù)隊(duì)列卻因?yàn)殛?duì)列已滿無法加入的情況。另外在線程池關(guān)閉的時(shí)候也需要對(duì)任務(wù)加入隊(duì)列操作進(jìn)行額外的協(xié)調(diào)處理。
RejectedExecutionHandler提供了四種方式來處理任務(wù)拒絕策略。
這四種策略是獨(dú)立無關(guān)的,是對(duì)任務(wù)拒絕處理的四中表現(xiàn)形式。最簡(jiǎn)單的方式就是直接丟棄任務(wù)。但是卻有兩種方式,到底是該丟棄哪一個(gè)任務(wù),比如可以丟棄當(dāng)前將要加入隊(duì)列的任務(wù)本身(DiscardPolicy)或者丟棄任務(wù)隊(duì)列中最舊任務(wù)(DiscardOldestPolicy)。丟棄最舊任務(wù)也不是簡(jiǎn)單的丟棄最舊的任務(wù),而是有一些額外的處理。除了丟棄任務(wù)還可以直接拋出一個(gè)異常(RejectedExecutionException),這是比較簡(jiǎn)單的方式。拋出異常的方式(AbortPolicy)盡管實(shí)現(xiàn)方式比較簡(jiǎn)單,但是由于拋出一個(gè)RuntimeException,因此會(huì)中斷調(diào)用者的處理過程。除了拋出異常以外還可以不進(jìn)入線程池執(zhí)行,在這種方式(CallerRunsPolicy)中任務(wù)將有調(diào)用者線程去執(zhí)行。
上面是一些理論知識(shí),下面結(jié)合一些例子進(jìn)行分析討論。
import java.lang.reflect.Field;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.ThreadPoolExecutor.DiscardPolicy;
public class ExecutorServiceDemo {
static void log(String msg) {
System.out.println(System.currentTimeMillis() + " -> " + msg);
}
static int getThreadPoolRunState(ThreadPoolExecutor pool) throws Exception {
Field f = ThreadPoolExecutor.class.getDeclaredField("runState");
f.setAccessible(true);
int v = f.getInt(pool);
return v;
}
public static void main(String[] args) throws Exception {
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(1));
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
for (int i = 0; i < 10; i++) {
final int index = i;
pool.submit(new Runnable() {
public void run() {
log("run task:" + index + " -> " + Thread.currentThread().getName());
try {
Thread.sleep(1000L);
} catch (Exception e) {
e.printStackTrace();
}
log("run over:" + index + " -> " + Thread.currentThread().getName());
}
});
}
log("before sleep");
Thread.sleep(4000L);
log("before shutdown()");
pool.shutdown();
log("after shutdown(),pool.isTerminated=" + pool.isTerminated());
pool.awaitTermination(1000L, TimeUnit.SECONDS);
log("now,pool.isTerminated=" + pool.isTerminated() + ", state="
+ getThreadPoolRunState(pool));
}
}
第一種方式直接丟棄(DiscardPolicy)的輸出結(jié)果是:
1294494050696 -> before sleep
1294494051697 -> run over:0 -> pool-1-thread-1
1294494051697 -> run task:1
1294494052697 -> run over:1 -> pool-1-thread-1
1294494054697 -> before shutdown()
1294494054697 -> after shutdown(),pool.isTerminated=false
1294494054698 -> now,pool.isTerminated=true, state=3
對(duì)于上面的結(jié)果需要補(bǔ)充幾點(diǎn)。
- 線程池設(shè)定線程大小為1,因此輸出的線程就只有一個(gè)”pool-1-thread-1”,至于為什么是這個(gè)名稱,以后會(huì)分析。
- 任務(wù)隊(duì)列的大小為1,因此可以輸出一個(gè)任務(wù)執(zhí)行結(jié)果。但是由于線程本身可以帶有一個(gè)任務(wù),因此實(shí)際上一共執(zhí)行了兩個(gè)任務(wù)(task0和task1)。
- shutdown()一個(gè)線程并不能理解是線程運(yùn)行狀態(tài)位terminated,可能需要稍微等待一點(diǎn)時(shí)間。盡管這里等待時(shí)間參數(shù)是1000秒,但是實(shí)際上從輸出時(shí)間來看僅僅等了約1ms。
- 直接丟棄任務(wù)是丟棄將要進(jìn)入線程池本身的任務(wù),所以當(dāng)運(yùn)行task0是,task1進(jìn)入任務(wù)隊(duì)列,task2~task9都被直接丟棄了,沒有運(yùn)行。
如果把策略換成丟棄最舊任務(wù)(DiscardOldestPolicy),結(jié)果會(huì)稍有不同。
1294494484622 -> before sleep
1294494485622 -> run over:0 -> pool-1-thread-1
1294494485622 -> run task:9
1294494486622 -> run over:9 -> pool-1-thread-1
1294494488622 -> before shutdown()
1294494488622 -> after shutdown(),pool.isTerminated=false
1294494488623 -> now,pool.isTerminated=true, state=3
這里依然只是執(zhí)行兩個(gè)任務(wù),但是換成了任務(wù)task0和task9。實(shí)際上task1~task8還是進(jìn)入了任務(wù)隊(duì)列,只不過被task9擠出去了。
對(duì)于異常策略(AbortPolicy)就比較簡(jiǎn)單,這回調(diào)用線程的任務(wù)執(zhí)行。
對(duì)于調(diào)用線程執(zhí)行方式(CallerRunsPolicy),輸出的結(jié)果就有意思了。
1294496076266 -> run task:0 -> pool-1-thread-1
1294496077266 -> run over:0 -> pool-1-thread-1
1294496077266 -> run task:1 -> pool-1-thread-1
1294496077266 -> run over:2 -> main
1294496077266 -> run task:4 -> main
1294496078267 -> run over:4 -> main
1294496078267 -> run task:5 -> main
1294496078267 -> run over:1 -> pool-1-thread-1
1294496078267 -> run task:3 -> pool-1-thread-1
1294496079267 -> run over:3 -> pool-1-thread-1
1294496079267 -> run over:5 -> main
1294496079267 -> run task:7 -> main
1294496079267 -> run task:6 -> pool-1-thread-1
1294496080267 -> run over:7 -> main
1294496080267 -> run task:9 -> main
1294496080267 -> run over:6 -> pool-1-thread-1
1294496080267 -> run task:8 -> pool-1-thread-1
1294496081268 -> run over:9 -> main
1294496081268 -> before sleep
1294496081268 -> run over:8 -> pool-1-thread-1
1294496085268 -> before shutdown()
1294496085268 -> after shutdown(),pool.isTerminated=false
1294496085269 -> now,pool.isTerminated=true, state=3
由于啟動(dòng)線程有稍微的延時(shí),因此一種可能的執(zhí)行順序是這樣的。
- 首先pool-1-thread-1線程執(zhí)行task0,同時(shí)將task1加入任務(wù)隊(duì)列(submit(task1))。
- 對(duì)于task2,由于任務(wù)隊(duì)列已經(jīng)滿了,因此有調(diào)用線程main執(zhí)行(execute(task2))。
- 在mian等待task2任務(wù)執(zhí)行完畢,對(duì)于任務(wù)task3,由于此時(shí)任務(wù)隊(duì)列已經(jīng)空了,因此task3將進(jìn)入任務(wù)隊(duì)列。
- 此時(shí)main線程是空閑的,因此對(duì)于task4將由main線程執(zhí)行。此時(shí)pool-1-thread-1線程可能在執(zhí)行任務(wù)task1。任務(wù)隊(duì)列中依然有任務(wù)task3。
- 因此main線程執(zhí)行完畢task4后就立即執(zhí)行task5。
- 很顯然task1執(zhí)行完畢,task3被線程池執(zhí)行,因此task6進(jìn)入任務(wù)隊(duì)列。此時(shí)task7被main線程執(zhí)行。
- task6開始執(zhí)行時(shí),task8進(jìn)入任務(wù)隊(duì)列。main線程開始執(zhí)行task9。
- 然后線程池執(zhí)行線程task8結(jié)束。
- 整個(gè)任務(wù)隊(duì)列執(zhí)行完畢,線程池完畢。
如果有興趣可以看看ThreadPoolExecutor中四種RejectedExecutionHandler的源碼,都非常簡(jiǎn)單。