[本文是我對Java Concurrency In Practice C08的歸納和總結(jié). ?轉(zhuǎn)載請注明作者和出處, ?如有謬誤, 歡迎在評論中指正. ]
Executors的靜態(tài)方法newCachedThreadPool, newFixedThreadPool, newScheduledThreadPool所返回的線程池都是ThreadPoolExecutor對象或者其子類對象. ThreadPoolExecutor提供了多種配置, 可以根據(jù)實際定制合適的線程池.
?
線程的創(chuàng)建和銷毀
ThreadPoolExecutor構(gòu)造函數(shù)中的corePoolSize, maximumPoolSize, keepAliveTime參數(shù)與線程的創(chuàng)建和銷毀相關(guān).?
corePoolSize指定ThreadPoolExecutor中持有的核心線程數(shù), 除非task隊列已滿, ThreadPoolExecutor不會創(chuàng)建超過核心線程數(shù)的線程(corePoolSize為0時是一種特殊情況, 此時就算task隊列沒有飽和, 向線程池第一次提交task時仍然會創(chuàng)建新的線程), 核心線程一旦創(chuàng)建就不會銷毀, 除非設(shè)置了allowCoreThreadTimeOut(true), 或者關(guān)閉線程池.
maximumPoolSize指定線程池中持有的最大線程數(shù). 對于超過核心線程數(shù)的線程, 如果在指定的超時時間內(nèi)沒有使用到, 就會被銷毀.
keepAliveTime指定超時時間.
Executors類的靜態(tài)方法創(chuàng)建線程池的源碼:
public static ExecutorService newCachedThreadPool() { // 核心線程數(shù)為0, 最大線程數(shù)為Integer.MAX_VALUE, 超時時間為60s return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads) { // 核心線程數(shù)和最大線程數(shù)都為調(diào)用方指定的值nThreads, 超時時間為0 return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { // 核心線程數(shù)由調(diào)用方指定, 最大線程數(shù)為Integer.MAX_VALUE, 超時時間為0 return new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue()); }?
?
task隊列
線程池內(nèi)部持有一個task隊列, 當task的提交速度超過task的執(zhí)行速度時, task將被緩存在task隊列中等待有線程可用時再執(zhí)行. ThreadPoolExecutor在創(chuàng)建時可以為其指定task隊列, 開發(fā)者一般有三種選擇: 有界隊列, 無界隊列以及同步隊列. Executors.newFixedThreadPool和Executors.newScheduledThreadPool返回的ThreadPoolExecutor對象使用的是無界隊列, 而Executors.newCashedThreadPool返回的ThreadPoolExecutor對象使用的是同步隊列.
為線程數(shù)不多的線程池指定一個容量大的隊列(或者無界隊列), 有助于減少線程間切換, CPU等方面的消耗, 代價是可能會造成吞吐量下降. 如果使用的是有界隊列, 隊列可能會被填滿, 此時將根據(jù)指定的飽和策略進行處理(見之后的講述).
對于線程數(shù)很大的線程池, 可以使用同步隊列. 同步隊列(SynchronousQueue)其實不能算是一種隊列, 因為同步隊列沒有緩存的作用. 使用同步隊列時, task被提交時, 直接由線程池中的線程接手. 如果此時線程池中沒有可用的線程, 線程池將創(chuàng)建新的線程接手. 如果線程池無法創(chuàng)建新的線程(比如線程數(shù)已到達maximumPoolSize), 則根據(jù)指定的飽和策略進行處理(同樣見之后的講述).
?
飽和策略
如果線程池使用的是有界隊列, 那么當有界隊列滿時繼續(xù)提交task時飽和策略會被觸發(fā).
如果線程池使用的是同步隊列, 那么當線程池無法創(chuàng)建新的線程接手task時飽和策略會被觸發(fā).
如果線程池被關(guān)閉后, 仍然向其提交task時, 飽和策略也會被觸發(fā).
ThreadPoolExecutor.setRejectedExecutionHandler方法用于設(shè)定飽和策略. 該方法接受一個RejectedExecutionHandler對象作為參數(shù). RejectedExecutionHandler只定義了一個方法:rejectedExecution(Runnable r, ThreadPoolExecutor executor). rejectedExecution方法在飽和策略被觸發(fā)時由系統(tǒng)回調(diào).
ThreadPoolExecutor類中預(yù)定義了多個RejectedExecutionHandler的實現(xiàn)類: AbortPolicy, CallerRunsPolicy, DiscardPolicy, 和DiscardOldestPolicy.
AbortPolicy是默認的飽和策略, 其rejectedExecution方法為:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException(); }?
可見默認情況下, 觸發(fā)飽和策略時將拋出RejectedExecutionException異常.
CallerRunsPolicy. 飽和時將在提交task的線程中執(zhí)行task, 而不是由線程池中的線程執(zhí)行:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
使用CallerRunsPolicy的例子:
class LifecycleWebServer { // MAX_THREAD_COUNT和MAX_QUEUE_COUNT的值根據(jù)系統(tǒng)的實際情況確定 private static final int MAX_THREAD_COUNT = 100; private static final int MAX_QUEUE_COUNT = 1000; // 使用有界隊列作為task隊列, 當有界隊列滿時, 將觸發(fā)飽和策略 private final ThreadPoolExecutor exec = new ThreadPoolExecutor(0, MAX_THREAD_COUNT, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(MAX_QUEUE_COUNT)); public void start() throws IOException { // 設(shè)置飽和策略為CallerRunsPolicy exec.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); ServerSocket socket = new ServerSocket(80); while (!exec.isShutdown()) { try { final Socket conn = socket.accept(); exec.execute(new Runnable() { public void run() { handleRequest(conn); } }); } catch (RejectedExecutionException e) { if (!exec.isShutdown()) log("task submission rejected", e); } } } public void stop() { exec.shutdown(); } void handleRequest(Socket connection) { Request req = readRequest(connection); if (isShutdownRequest(req)) stop(); else dispatchRequest(req); } public static void main(String[] args) { LifecycleWebServer server = new LifecycleWebServer(); try { // 在main線程中啟動server server.start(); } catch (IOException e) { e.printStackTrace(); } } }?
LifecycleWebServer中的線程池使用CallerRunsPolicy作為其飽和策略. 如果線程池飽和時main線程仍然向線程池提交task, 那么task將在main中執(zhí)行. main線程執(zhí)行task是需要一定時間的, 這樣就給了線程池喘息的機會, 而且main線程在執(zhí)行task的時間內(nèi)無法接受socket連接, 因此socket連接請求將緩存在tcp層. 如果server過載持續(xù)的時間較長, 使得tcp層的緩存不夠, 那么tcp緩存將根據(jù)其策略丟棄部分請求. 如此一來, 整個系統(tǒng)的過載壓力逐步向外擴散: 線程池-線程池中的隊列-main線程-tcp層-client. 這樣的系統(tǒng)在發(fā)生過載時是比較優(yōu)雅的: 既不會因為過多的請求而導致系統(tǒng)資源耗盡, 也不會一發(fā)生過載時就拒絕服務(wù), 只有發(fā)生長時間系統(tǒng)過載時才會出現(xiàn)客戶端無法連接的情況.
DiscardPolicy. 該策略將最新提交的task丟棄:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 丟棄, 不做任何處理 }?
DiscardOldestPolicy. 該策略丟棄隊列中處于對頭的task, 且試著再次提交最新的task:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }?
DiscardOldestPolicy與PriorityBlockingQueue結(jié)合使用時可能會造成不好的結(jié)果, 因為PriorityBlockingQueue中位于對頭的task是優(yōu)先級最高的task, 發(fā)生飽和時反而首先丟棄優(yōu)先級高的task可能不符合需求.
ThreadPoolExecutor沒有提供飽和時阻塞的策略, 不過開發(fā)者可以結(jié)合Semaphore實現(xiàn):
public class BoundedExecutor { private final Executor exec; private final Semaphore semaphore; public BoundedExecutor(Executor exec, int bound) { this.exec = exec; // 設(shè)定信號量permit的上限 this.semaphore = new Semaphore(bound); } public void submitTask(final Runnable command) throws InterruptedException { // 提交task前先申請permit, 如果無法申請到permit, 調(diào)用submitTask的線程將被阻塞, 直到有permit可用 semaphore.acquire(); try { exec.execute(new Runnable() { public void run() { try { command.run(); } finally { // 提交成功了, 運行task后釋放permit semaphore.release(); } } }); } catch (RejectedExecutionException e) { // 如果沒有提交成功, 也需要釋放permit semaphore.release(); } } }
?
ThreadFactory
在創(chuàng)建ThreadPoolExecutor時還可以為其指定ThreadFactory, 當線程池需要創(chuàng)建新的線程時會調(diào)用ThreadFactory的newThread方法. 默認的ThreadFactory創(chuàng)建的線程是nonDaemon, 線程優(yōu)先級為NORM_PRIORITY的線程, 并且為其指定了可識別的線程名稱:
public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; }?
開發(fā)者可以根據(jù)自身需要為ThreadPoolExecutor指定自定義的ThreadFactory. 例如:
public class MyThreadFactory implements ThreadFactory { private final String poolName; public MyThreadFactory(String poolName) { this.poolName = poolName; } public Thread newThread(Runnable runnable) { return new MyAppThread(runnable, poolName); } } public class MyAppThread extends Thread { public static final String DEFAULT_NAME = "MyAppThread"; private static volatile boolean debugLifecycle = false; private static final AtomicInteger created = new AtomicInteger(); private static final AtomicInteger alive = new AtomicInteger(); private static final Logger log = Logger.getAnonymousLogger(); public MyAppThread(Runnable r) { this(r, DEFAULT_NAME); } public MyAppThread(Runnable runnable, String name) { // 為自定義的Thread類指定線程名稱 super(runnable, name + "-" + created.incrementAndGet()); // 設(shè)置UncaughtExceptionHandler. UncaughtExceptionHandler的uncaughtException方法將在線程運行中拋出未捕獲異常時由系統(tǒng)調(diào)用 setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e); } }); } public void run() { // Copy debug flag to ensure consistent value throughout. boolean debug = debugLifecycle; if (debug) log.log(Level.FINE, "Created " + getName()); try { alive.incrementAndGet(); super.run(); } finally { alive.decrementAndGet(); if (debug) log.log(Level.FINE, "Exiting " + getName()); } } public static int getThreadsCreated() { return created.get(); } public static int getThreadsAlive() { return alive.get(); } public static boolean getDebug() { return debugLifecycle; } public static void setDebug(boolean b) { debugLifecycle = b; } }
?
擴展ThreadPoolExecutor
ThreadPoolExecutor類提供了多個"鉤子"方法, 以供其子類實現(xiàn), 比如beforeExecute, afterExecute, terminated等. 所謂"鉤子"是指基類預(yù)留的, 但是沒有提供具體實現(xiàn)的方法, 其方法體為空. 子類可以根據(jù)需要為"鉤子"提供具體實現(xiàn).
beforeExecute和afterExecute方法分別在執(zhí)行task前后調(diào)用:
private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { if (runState < STOP && Thread.interrupted() && runState >= STOP) thread.interrupt(); boolean ran = false; beforeExecute(thread, task); try { task.run(); ran = true; afterExecute(task, null); ++completedTasks; } catch (RuntimeException ex) { if (!ran) afterExecute(task, ex); throw ex; } } finally { runLock.unlock(); } }?
beforeExecute和afterExecute方法可以用于記錄日志, 統(tǒng)計數(shù)據(jù)等操作.
terminated方法在線程池被關(guān)閉后調(diào)用. terminated方法可以用于釋放線程池申請的資源.
?