從原始碼看執行緒池執行任務的流程及原理
阿新 • • 發佈:2020-07-13
1. 執行緒池的必要性與核心引數
頻繁建立、銷燬執行緒的開銷過大,所以建立執行緒頻率高的場景一般都選擇使用執行緒池,例如tomcat與客戶端通訊時處理髮來的請求。建立執行緒池的引數如下
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,RejectedExecutionHandler handler)
- corePoolSize:池中核心執行緒數量,執行緒數量不大於corePoolSize時執行緒不會因為空閒時間超過閾值而被銷燬。
- maximumPoolSize:最大執行緒數,當執行緒數量大於corePoolSize而小於maximumPoolSize時,如果一個執行緒空閒時間超過閾值,則會被回收。
- keepAliveTime:非核心執行緒空閒的最大時間,超過這個值就會被回收。
- unit:keepAliveTime引數的時間單位。
- workQueue:用來暫存任務的佇列。
- threadFactory:由於提交到執行緒池的是任務,需要包裝成執行緒來執行,threadFactory就是用來生產執行緒的。
- handler:當執行緒池由於各種原因不接受一個任務時,使用這個物件的拒絕方法拒絕,不同的實現類的拒絕策略不同,直接拋異常/重試/無視
根據不同的引數搭配,建立的執行緒池適應不同的場景,Executors類可以生成幾種典型的執行緒池:
- 固定執行緒數的執行緒池:corePoolSize、maximumPoolSize設定成一樣,並使用無界的阻塞佇列。這樣執行緒池數量從0增加到corePoolSize後就一直保持這個數量,再有任務來時直接加入阻塞佇列,反正阻塞佇列是無界限的。這種執行緒池適合併發量較大且波動不大的場景,但是要注意佇列過大佔用資源的問題。
- 單執行緒執行緒池:固定執行緒數執行緒池的特殊場景,corePoolSize、maximumPoolSize都設為1即可。適合併發量穩定保持在低水平的場景,也要注意阻塞佇列過大的問題。
- 直接提交執行緒池:corePoolSize設為0,maximumPoolSize設為最大值,阻塞佇列是一個直接提交的佇列,對於新來的任務每次都建立執行緒處理。適合併發量很難確定的場景。
2. 執行任務的流程
- 接收到任務後的主流程
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//獲取執行緒池狀態及當前執行緒數記錄,高3位記錄狀態,低29位記錄執行緒數
int c = ctl.get();
//如果執行緒數小於核心執行緒閾值
if (workerCountOf(c) < corePoolSize) {
//直接包裝成Worker執行(true表示將任務以核心執行緒身份包裝)
if (addWorker(command, true))
return;
c = ctl.get(); //如果上一步失敗,則獲取最新的狀態
}
//如果執行緒池還在執行,則嘗試將任務加入阻塞佇列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get(); //加入成功後再檢查執行緒池狀態
//如果加入後執行緒池沒有在執行,將任務從佇列中移除
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果前面加入工作佇列失敗,則以非核心執行緒身份包裝任務(false標誌)
else if (!addWorker(command, false))
reject(command); //如果上一步也失敗,則拒絕這個任務
}
總結:(1)來了新任務,先檢視核心執行緒數是否已經達到,未達到則直接以核心執行緒的方式新建執行緒執行任務;(2)如果執行緒數已經達到核心閾值則將任務加入阻塞佇列;(3)如果加入佇列失敗則以非核心的方式建立執行緒執行任務;(4)還是失敗則拒絕這個任務
- 新建執行緒執行任務:addWorker(Runnable firstTask, boolean core)的操作
private boolean addWorker(Runnable firstTask, boolean core) {
/**
此處省略一段程式碼,大致工作是檢查當前執行緒數量是否小於閾值(否就返回false),
CAS的檢查執行緒池狀態和CAS的將執行緒數+1。
*/
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//根據任務包裝Worker,Worker物件有一個執行緒物件也在此時根據Worker物件生成,Worker實現了Runnable介面
w = new Worker(firstTask);
//獲取Worker裡生成的執行緒物件
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//獲取執行緒狀態
int rs = runStateOf(ctl.get());
//檢查執行緒池狀態的合法性
if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 如果此刻執行緒就已經活躍,那麼就是出錯了
throw new IllegalThreadStateException();
workers.add(w); //將生成的Worker物件放到Set集合
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true; //記錄Worker已經成功新增到集合
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); //如果Worker物件已經成功新增,則啟動這個物件的執行緒
workerStarted = true; //記錄任務已經開始執行
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w); //如果最終發現任務沒有標誌為開始,則呼叫響應處理方法
}
return workerStarted;
}
這裡要注意Worker本身實現了Ruunable介面有一個run方法,Worker物件持有提交來的task和一個執行緒物件,這個執行緒物件根據Worker物件生成所以執行的是Worker的run方法,而Worker的run方法裡又是呼叫了task的run方法。其實就是一個代理模式:Worker物件代理了提交的task。
所以現在知道了t.start()後會執行Worker的run方法,下面看Worker的run方法:
public void run() {
runWorker(this); //呼叫另一個runWorker方法,並將自己傳入
}
- 真正執行任務的方法:runWorker(Worker w)
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; //首先拿到提交的那個task
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//核心操作就是迴圈的getTask(),從阻塞佇列裡拿task來執行
while (task != null || (task = getTask()) != null) {
w.lock();
//檢查執行緒池狀態
if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())
wt.interrupt(); //不合法就中斷執行此任務的執行緒
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); //執行真實角色的run方法
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
可以看到,使用Worker的run方法作為執行緒的執行方法代理task的run方法,主要操作是在執行完task的run方法後再getTask()去佇列裡獲取新任務繼續執行。