java 線程池

圖左邊是線程池的類結(jié)構(gòu),右邊是放入線程池執(zhí)行的任務(wù)類結(jié)構(gòu)
ExecutorService
ExecutorService定義了線程池的基本的方法,AbstractExecutorService是個抽象類,實現(xiàn)了ExecutorService的部分,主要是實現(xiàn)了幾個submit()方法,并且提供方法將提交進(jìn)來的任務(wù)
封裝成FutureTask,ThreadPoolExecutor則實現(xiàn)線程池的管理ExecutorService定義了線程池的基本的方法,AbstractExecutorService是個抽象類,實現(xiàn)了ExecutorService的部分,主要是實現(xiàn)了幾個submit()方法,并且提供方法將提交進(jìn)來的任務(wù)
FutureTask
FutureTask內(nèi)部引用了一個AQS的實現(xiàn),當(dāng)FutureTask沒有執(zhí)行完畢的時候,F(xiàn)utureTask的get()方法所在線程阻塞在AQS阻塞隊列上,所以get()方法可以認(rèn)為是一個異步變同步的過程。
AbstractExecutorService的submit(Runnable)方法:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Object> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}調(diào)用的是ThreadPoolExecutor類實現(xiàn)的execute(Runnable)方法:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Object> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {//關(guān)鍵點1
if (runState == RUNNING && workQueue.offer(command)) {//關(guān)鍵點2
if (runState != RUNNING || poolSize == 0)//關(guān)鍵點3
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))關(guān)鍵點4
reject(command); // is shutdown or saturated
}
}
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {//關(guān)鍵點1
if (runState == RUNNING && workQueue.offer(command)) {//關(guān)鍵點2
if (runState != RUNNING || poolSize == 0)//關(guān)鍵點3
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))關(guān)鍵點4
reject(command); // is shutdown or saturated
}
}
ThreadPoolExecutor里面有幾個size:poolSize 當(dāng)前線程池的大小,corePoolSize 核心線程池大小,maximumPoolSize最大線程池大小。根據(jù)這幾個size線程池執(zhí)行任務(wù)的方式也會不同。
關(guān)鍵點1.當(dāng)poolSize大于coreSize的時候執(zhí)行關(guān)鍵點2,也就是進(jìn)入等待執(zhí)行的隊列;否則執(zhí)行addIfUnderMaximumPoolSize()方法,該方法創(chuàng)建一個線程,這個線程被加入到工作線程集當(dāng)中,并且第一個要執(zhí)行的就是當(dāng)前這個任務(wù)。
關(guān)鍵點2.poolSize小于coreSize時將任務(wù)加入到等待隊列,如果成功執(zhí)行關(guān)鍵點3,如果ThreadPoolExecutor的狀態(tài)不是runnable或者等待隊列加入失敗的話,執(zhí)行關(guān)鍵點4
關(guān)鍵點3.此時可能ThreadPoolExecutor的狀態(tài)不是runnable或者poolSize為0,可能是執(zhí)行shutDown()引起的,這個需要特殊處理
關(guān)鍵點4.此時poolSize大于coreSize,然后等待隊列滿了,需要直接創(chuàng)建一個線程執(zhí)行任務(wù),并把線程放入到工作線程集中。如果前面執(zhí)行不成功,那么就需要執(zhí)行相應(yīng)的飽和策略了
總結(jié):線程池包含三部分:coreSize大小的核心線程池,等待隊列,以及擴展線程池,核心線程池和擴展線程池其實是在同一個工作線程集當(dāng)中
工作線程
工作線程集中每一個對象用內(nèi)部類Worker表示,實現(xiàn)了Runnable接口,里面引用了線程:
工作線程執(zhí)行第一個任務(wù),然后加入到線程池,不斷執(zhí)行上面的run()方法,基本就是:等待隊列取任務(wù)-->執(zhí)行-->取任務(wù)-->執(zhí)行的循環(huán),中間需要處理外部執(zhí)行shutdown()后的流程,
需要動態(tài)的增大減小工作線程池,核心線程池,擴展線程池。getTask()方法里面有做這些:
在Worker的runTask()方法中最主要還是執(zhí)行FutureTask的run方法:
說到底FutureTask還是用AQS的阻塞實現(xiàn)的。
關(guān)鍵點1.當(dāng)poolSize大于coreSize的時候執(zhí)行關(guān)鍵點2,也就是進(jìn)入等待執(zhí)行的隊列;否則執(zhí)行addIfUnderMaximumPoolSize()方法,該方法創(chuàng)建一個線程,這個線程被加入到工作線程集當(dāng)中,并且第一個要執(zhí)行的就是當(dāng)前這個任務(wù)。
關(guān)鍵點2.poolSize小于coreSize時將任務(wù)加入到等待隊列,如果成功執(zhí)行關(guān)鍵點3,如果ThreadPoolExecutor的狀態(tài)不是runnable或者等待隊列加入失敗的話,執(zhí)行關(guān)鍵點4
關(guān)鍵點3.此時可能ThreadPoolExecutor的狀態(tài)不是runnable或者poolSize為0,可能是執(zhí)行shutDown()引起的,這個需要特殊處理
關(guān)鍵點4.此時poolSize大于coreSize,然后等待隊列滿了,需要直接創(chuàng)建一個線程執(zhí)行任務(wù),并把線程放入到工作線程集中。如果前面執(zhí)行不成功,那么就需要執(zhí)行相應(yīng)的飽和策略了
總結(jié):線程池包含三部分:coreSize大小的核心線程池,等待隊列,以及擴展線程池,核心線程池和擴展線程池其實是在同一個工作線程集當(dāng)中
工作線程
工作線程集中每一個對象用內(nèi)部類Worker表示,實現(xiàn)了Runnable接口,里面引用了線程:
private final class Worker implements Runnable {
/**
* The runLock is acquired and released surrounding each task
* execution. It mainly protects against interrupts that are
* intended to cancel the worker thread from instead
* interrupting the task being run.
*/
private final ReentrantLock runLock = new ReentrantLock();
/**
* Initial task to run before entering run loop. Possibly null.
*/
private Runnable firstTask;
/**
* Per thread completed task counter; accumulated
* into completedTaskCount upon termination.
*/
volatile long completedTasks;
/**
* Thread this worker is running in. Acts as a final field,
* but cannot be set until thread is created.
*/
Thread thread;
Worker(Runnable firstTask) {
//第一個任務(wù)
this.firstTask = firstTask;
}
boolean isActive() {
return runLock.isLocked();
}
/**
* Interrupts thread if not running a task.
*/
void interruptIfIdle() {
final ReentrantLock runLock = this.runLock;
if (runLock.tryLock()) {
try {
if (thread != Thread.currentThread())
thread.interrupt();
} finally {
runLock.unlock();
}
}
}
/**
* Interrupts thread even if running a task.
*/
void interruptNow() {
thread.interrupt();
}
/**
* Runs a single task between before/after methods.
*/
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runLock;
runLock.lock();
try {
/*
* Ensure that unless pool is stopping, this thread
* does not have its interrupt set. This requires a
* double-check of state in case the interrupt was
* cleared concurrently with a shutdownNow -- if so,
* the interrupt is re-enabled.
*/
//雙重檢測,對shutdown()執(zhí)行后而且執(zhí)行了interruptIfIdle()方法(在getTask()中)這段代碼會用到
if (runState < STOP &&
Thread.interrupted() &&
runState >= STOP)
thread.interrupt();
/*
* Track execution state to ensure that afterExecute
* is called only if task completed or threw
* exception. Otherwise, the caught runtime exception
* will have been thrown by afterExecute itself, in
* which case we don't want to call it again.
*/
boolean ran = false;
//執(zhí)行前的hook
beforeExecute(thread, task);
try {
//FutureTask的run()方法
task.run();
ran = true;
//執(zhí)行后的hook
afterExecute(task, null);
++completedTasks;
} catch (RuntimeException ex) {
if (!ran)
afterExecute(task, ex);
throw ex;
}
} finally {
runLock.unlock();
}
}
/**
* Main run loop
* 由于本runnable在創(chuàng)建線程時是線程構(gòu)造函數(shù)的參數(shù),
* 所以線程運行最終會運行下面的run方法。
* 而且屬性thread的值就是創(chuàng)建的線程
*/
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
//循環(huán)中運行runTask()方法,firstTask不為空,就先運行firstTask
//之后運行g(shù)etTask()方法,該方法就是從等待隊列里面獲取任務(wù)或者阻塞等待任務(wù)。
//由于得到的task可能為空,所以while循環(huán)可能跳出
while (task != null || (task = getTask()) != null) {
//執(zhí)行任務(wù)
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}
}
/**
* The runLock is acquired and released surrounding each task
* execution. It mainly protects against interrupts that are
* intended to cancel the worker thread from instead
* interrupting the task being run.
*/
private final ReentrantLock runLock = new ReentrantLock();
/**
* Initial task to run before entering run loop. Possibly null.
*/
private Runnable firstTask;
/**
* Per thread completed task counter; accumulated
* into completedTaskCount upon termination.
*/
volatile long completedTasks;
/**
* Thread this worker is running in. Acts as a final field,
* but cannot be set until thread is created.
*/
Thread thread;
Worker(Runnable firstTask) {
//第一個任務(wù)
this.firstTask = firstTask;
}
boolean isActive() {
return runLock.isLocked();
}
/**
* Interrupts thread if not running a task.
*/
void interruptIfIdle() {
final ReentrantLock runLock = this.runLock;
if (runLock.tryLock()) {
try {
if (thread != Thread.currentThread())
thread.interrupt();
} finally {
runLock.unlock();
}
}
}
/**
* Interrupts thread even if running a task.
*/
void interruptNow() {
thread.interrupt();
}
/**
* Runs a single task between before/after methods.
*/
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runLock;
runLock.lock();
try {
/*
* Ensure that unless pool is stopping, this thread
* does not have its interrupt set. This requires a
* double-check of state in case the interrupt was
* cleared concurrently with a shutdownNow -- if so,
* the interrupt is re-enabled.
*/
//雙重檢測,對shutdown()執(zhí)行后而且執(zhí)行了interruptIfIdle()方法(在getTask()中)這段代碼會用到
if (runState < STOP &&
Thread.interrupted() &&
runState >= STOP)
thread.interrupt();
/*
* Track execution state to ensure that afterExecute
* is called only if task completed or threw
* exception. Otherwise, the caught runtime exception
* will have been thrown by afterExecute itself, in
* which case we don't want to call it again.
*/
boolean ran = false;
//執(zhí)行前的hook
beforeExecute(thread, task);
try {
//FutureTask的run()方法
task.run();
ran = true;
//執(zhí)行后的hook
afterExecute(task, null);
++completedTasks;
} catch (RuntimeException ex) {
if (!ran)
afterExecute(task, ex);
throw ex;
}
} finally {
runLock.unlock();
}
}
/**
* Main run loop
* 由于本runnable在創(chuàng)建線程時是線程構(gòu)造函數(shù)的參數(shù),
* 所以線程運行最終會運行下面的run方法。
* 而且屬性thread的值就是創(chuàng)建的線程
*/
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
//循環(huán)中運行runTask()方法,firstTask不為空,就先運行firstTask
//之后運行g(shù)etTask()方法,該方法就是從等待隊列里面獲取任務(wù)或者阻塞等待任務(wù)。
//由于得到的task可能為空,所以while循環(huán)可能跳出
while (task != null || (task = getTask()) != null) {
//執(zhí)行任務(wù)
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}
}
工作線程執(zhí)行第一個任務(wù),然后加入到線程池,不斷執(zhí)行上面的run()方法,基本就是:等待隊列取任務(wù)-->執(zhí)行-->取任務(wù)-->執(zhí)行的循環(huán),中間需要處理外部執(zhí)行shutdown()后的流程,
需要動態(tài)的增大減小工作線程池,核心線程池,擴展線程池。getTask()方法里面有做這些:
Runnable getTask() {
for (;;) {
try {
int state = runState;
//線程池狀態(tài)為STOP或者TERMINATED的話,就不執(zhí)行隊列里面的任務(wù)了
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
//在SHUTDOWN的話還是需要執(zhí)行任務(wù)的
r = workQueue.poll();
//如果poolSize>corePoolSize 說明線程池大于核心線程池,那么隊列
//可能不會有任務(wù),allowCoreThreadTimeOut為true說明核心線程池
//線程timeout以后可以被回收,如果是上面二個條件之一的話,使用poll()方法
//可能超時后返回的就是空的任務(wù)
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
//上面兩條不符合,那么隊列有任務(wù)的概率大,這個會阻塞知道有任務(wù)(反正核心線程池不會被回收的)
r = workQueue.take();
if (r != null)
return r;
//上面可能返回為空,所以能運行到這里
//運行workerCanExit()方法,如果返回true,那么檢測如果
//線程池狀態(tài)為STOP或者TERMINATED的話,就開始中斷空閑的線程
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}
private boolean workerCanExit() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean canExit;
try {
//線程池狀態(tài)為STOP或者TERMINATED或者等待隊列為空或者核心線程可以回收加上工作
//線程池大于核心線程池
canExit = runState >= STOP ||
workQueue.isEmpty() ||
(allowCoreThreadTimeOut &&
poolSize > Math.max(1, corePoolSize));
} finally {
mainLock.unlock();
}
return canExit;
}
getTask()由于各種調(diào)節(jié)功能使得返回的任務(wù)可能為空,run()方法的workerDone()就有機會執(zhí)行:for (;;) {
try {
int state = runState;
//線程池狀態(tài)為STOP或者TERMINATED的話,就不執(zhí)行隊列里面的任務(wù)了
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
//在SHUTDOWN的話還是需要執(zhí)行任務(wù)的
r = workQueue.poll();
//如果poolSize>corePoolSize 說明線程池大于核心線程池,那么隊列
//可能不會有任務(wù),allowCoreThreadTimeOut為true說明核心線程池
//線程timeout以后可以被回收,如果是上面二個條件之一的話,使用poll()方法
//可能超時后返回的就是空的任務(wù)
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
//上面兩條不符合,那么隊列有任務(wù)的概率大,這個會阻塞知道有任務(wù)(反正核心線程池不會被回收的)
r = workQueue.take();
if (r != null)
return r;
//上面可能返回為空,所以能運行到這里
//運行workerCanExit()方法,如果返回true,那么檢測如果
//線程池狀態(tài)為STOP或者TERMINATED的話,就開始中斷空閑的線程
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}
private boolean workerCanExit() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean canExit;
try {
//線程池狀態(tài)為STOP或者TERMINATED或者等待隊列為空或者核心線程可以回收加上工作
//線程池大于核心線程池
canExit = runState >= STOP ||
workQueue.isEmpty() ||
(allowCoreThreadTimeOut &&
poolSize > Math.max(1, corePoolSize));
} finally {
mainLock.unlock();
}
return canExit;
}
void workerDone(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//更新總的完成次數(shù)
completedTaskCount += w.completedTasks;
//執(zhí)行到這里說明核心線程池可以回收而且返回了的任務(wù)為空
//認(rèn)為核心線程池太大,進(jìn)行回收
workers.remove(w);
if (--poolSize == 0)
tryTerminate();
} finally {
mainLock.unlock();
}
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//更新總的完成次數(shù)
completedTaskCount += w.completedTasks;
//執(zhí)行到這里說明核心線程池可以回收而且返回了的任務(wù)為空
//認(rèn)為核心線程池太大,進(jìn)行回收
workers.remove(w);
if (--poolSize == 0)
tryTerminate();
} finally {
mainLock.unlock();
}
}
在Worker的runTask()方法中最主要還是執(zhí)行FutureTask的run方法:
public void run() {
sync.innerRun();
}
private static final int RUNNING = 1;
/** State value representing that task ran */
private static final int RAN = 2;
/** State value representing that task was cancelled */
private static final int CANCELLED = 4;
void innerRun() {
//初始狀態(tài)變了,直接返回
if (!compareAndSetState(0, RUNNING))
return;
try {
runner = Thread.currentThread();
//如果不是RUNNING,那么肯定是RAN或者CANCELLED
//這兩個狀態(tài)都可以釋放鎖了
if (getState() == RUNNING) // recheck after setting thread
//設(shè)置執(zhí)行的結(jié)果
innerSet(callable.call());
else
//這里會執(zhí)行到FutureTask內(nèi)部的實現(xiàn)的tryReleaseShared方法
releaseShared(0); // cancel
} catch (Throwable ex) {
innerSetException(ex);
}
}
void innerSet(V v) {
//循環(huán)只到下面一項產(chǎn)生
for (;;) {
int s = getState();
//另外的線程已經(jīng)執(zhí)行了 直接返回
if (s == RAN)
return;
//任務(wù)取消了,釋放鎖
if (s == CANCELLED) {
// aggressively release to set runner to null,
// in case we are racing with a cancel request
// that will try to interrupt runner
releaseShared(0);
return;
}
//設(shè)置完成標(biāo)志,然后設(shè)置result,釋放鎖,喚醒阻塞在FutureTask.get()上面的線程
if (compareAndSetState(s, RAN)) {
result = v;
releaseShared(0);
//hook
done();
return;
}
}
}
protected boolean tryReleaseShared(int ignore) {
//線程清空
runner = null;
return true;
}
sync.innerRun();
}
private static final int RUNNING = 1;
/** State value representing that task ran */
private static final int RAN = 2;
/** State value representing that task was cancelled */
private static final int CANCELLED = 4;
void innerRun() {
//初始狀態(tài)變了,直接返回
if (!compareAndSetState(0, RUNNING))
return;
try {
runner = Thread.currentThread();
//如果不是RUNNING,那么肯定是RAN或者CANCELLED
//這兩個狀態(tài)都可以釋放鎖了
if (getState() == RUNNING) // recheck after setting thread
//設(shè)置執(zhí)行的結(jié)果
innerSet(callable.call());
else
//這里會執(zhí)行到FutureTask內(nèi)部的實現(xiàn)的tryReleaseShared方法
releaseShared(0); // cancel
} catch (Throwable ex) {
innerSetException(ex);
}
}
void innerSet(V v) {
//循環(huán)只到下面一項產(chǎn)生
for (;;) {
int s = getState();
//另外的線程已經(jīng)執(zhí)行了 直接返回
if (s == RAN)
return;
//任務(wù)取消了,釋放鎖
if (s == CANCELLED) {
// aggressively release to set runner to null,
// in case we are racing with a cancel request
// that will try to interrupt runner
releaseShared(0);
return;
}
//設(shè)置完成標(biāo)志,然后設(shè)置result,釋放鎖,喚醒阻塞在FutureTask.get()上面的線程
if (compareAndSetState(s, RAN)) {
result = v;
releaseShared(0);
//hook
done();
return;
}
}
}
protected boolean tryReleaseShared(int ignore) {
//線程清空
runner = null;
return true;
}
說到底FutureTask還是用AQS的阻塞實現(xiàn)的。
拒絕策略
當(dāng)maximumPoolSize表示的最大線程池滿了以后,加入的任務(wù)會被拒絕,JDK有四種拒絕的策略:
AbortPolicy:拋出異常
CallerRunsPolicy:當(dāng)前的線程直接的執(zhí)行任務(wù)
DiscardOldestPolicy:舍去最老的任務(wù),然后線程池執(zhí)行最新的任務(wù)
DiscardPolicy:直接拒絕任務(wù)
總結(jié):
線程池設(shè)計的初衷還是減少了每個任務(wù)調(diào)用的開銷,可以在執(zhí)行大量異步任務(wù)時提高性能,并且還可以管理資源
提高性能方面主要是去除了大部分任務(wù)調(diào)用時線程的創(chuàng)建和銷毀的開銷
資源管理方面:有核心線程池,等待隊列,擴展線程池等方面,對于不同的方面有不同的策略,并且存在一定的
動態(tài)調(diào)節(jié)線程池的能力,對線程池超負(fù)荷時也有一些拒絕策略
當(dāng)maximumPoolSize表示的最大線程池滿了以后,加入的任務(wù)會被拒絕,JDK有四種拒絕的策略:
AbortPolicy:拋出異常
CallerRunsPolicy:當(dāng)前的線程直接的執(zhí)行任務(wù)
DiscardOldestPolicy:舍去最老的任務(wù),然后線程池執(zhí)行最新的任務(wù)
DiscardPolicy:直接拒絕任務(wù)
總結(jié):
線程池設(shè)計的初衷還是減少了每個任務(wù)調(diào)用的開銷,可以在執(zhí)行大量異步任務(wù)時提高性能,并且還可以管理資源
提高性能方面主要是去除了大部分任務(wù)調(diào)用時線程的創(chuàng)建和銷毀的開銷
資源管理方面:有核心線程池,等待隊列,擴展線程池等方面,對于不同的方面有不同的策略,并且存在一定的
動態(tài)調(diào)節(jié)線程池的能力,對線程池超負(fù)荷時也有一些拒絕策略
posted on 2011-09-20 22:27 nod0620 閱讀(722) 評論(0) 編輯 收藏 所屬分類: 多線程 、java