執行緒池執行原理分析
要想分析透徹整個執行緒池執行的邏輯,是個龐雜的工程,牽扯到執行緒池生命週期管理,佇列管理,拒絕策略,調配邏輯等等.這裡只是從一個Runnable任務釋出到執行緒池中以後,執行緒池內部的執行邏輯角度去嘗試分析.
先貼出整理的執行緒池操作流程圖 , 然後開始追原始碼:
執行緒數量控制策略
ThreadPoolExecutor是執行緒池的實現類,無論是自定義執行緒池,還是使用系統提供的執行緒池,都會使用到這個類.通過類的execute(Runnable command)方法來執行Runnable任務.
那麼一旦將一個Runnable任務execute()以後,到底發生了什麼? 直接看程式碼
/**
* 將該Runnable任務加入執行緒池並在未來某個時刻執行
* 該任務可能執行在一個新的執行緒 或 一個已存在的執行緒池中的執行緒
* 如果該任務提交失敗,可能是因為執行緒池已關閉,或者已達到執行緒池佇列和執行緒數已滿.
* 該Runnable將交給RejectedExecutionHandler處理,丟擲RejectedExecutionException
*/
public void execute(Runnable command) {
if (command == null){
//如果沒傳入Runnable任務,則丟擲空指標異常
throw new NullPointerException();
}
int c = ctl.get();
//當前執行緒數 小於 核心執行緒數
if (workerCountOf(c) < corePoolSize) {
//直接開啟新的執行緒,並將Runnable傳入作為第一個要執行的任務,成功返回true,否則返回false
if (addWorker(command, true)){
return;
}
c = ctl.get();
}
//c < SHUTDOWN代表執行緒池處於RUNNING狀態 + 將Runnable新增到任務佇列,如果新增成功返回true失敗返回false
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//成功加入佇列後,再次檢查是否需要新增新執行緒(因為已存在的執行緒可能在上次檢查後銷燬了,或者執行緒池在進入本方法後關閉了)
if (! isRunning(recheck) && remove(command)){
//如果執行緒池處於非RUNNING狀態 並且 將該Runnable從任務佇列中移除成功,則拒絕執行此任務
//交給RejectedExecutionHandler呼叫rejectedExecution方法,拒絕執行此任務
reject(command);
}else if (workerCountOf(recheck) == 0){
//如果執行緒池執行緒數量為0,則建立一條新執行緒,去執行
addWorker(null, false);
}
}else if (!addWorker(command, false))
//如果執行緒池處於非RUNNING狀態 或 將Runnable新增到佇列失敗(佇列已滿導致),則執行預設的拒絕策略
reject(command);
}
整理流程如下:
1. 如果執行緒池中的執行緒數量少於corePoolSize(核心執行緒數量),那麼會直接開啟一個新的核心執行緒來執行任務,即使此時有空閒執行緒存在.
2. 如果執行緒池中執行緒數量大於等於corePoolSize(核心執行緒數量),那麼任務會被插入到任務佇列中排隊,等待被執行.此時並不新增新的執行緒.
3. 如果在步驟2中由於任務佇列已滿導致無法將新任務進行排隊,這個時候有兩種情況:
- 執行緒數量 [未] 達到maximumPoolSize(執行緒池最大執行緒數) , 立刻啟動一個非核心執行緒來執行任務.
- 執行緒數量 [已] 達到maximumPoolSize(執行緒池最大執行緒數) , 拒絕執行此任務.ThreadPoolExecutor會通過RejectedExecutionHandler,丟擲RejectExecutionException異常.
以上就是一旦將一個Runnable任務execute()以後,執行的一系列邏輯,理解起來並不難,下面再對其中呼叫的一些方法做一些追查,就更方便理解其中的執行邏輯.
執行緒數量及執行緒池狀態管理
我們發現在execute()方法中頻繁的執行這句c = ctl.get();
程式碼,那麼這ctl是什麼,get()方法獲取到的是什麼,獲取到的c又用來做什麼?
上原始碼:
//建立AtomicInteger物件
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; //32-3 = 29
//最大執行緒容量
private static final int CAPACITY = (1 << COUNT_BITS) - 1; //將1的二進位制向右位移29位,再減1
//執行狀態儲存在int值的高3位 (所有數值左移29位)
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
//執行狀態
private static int runStateOf(int c) { return c & ~CAPACITY; }
//執行緒數量
private static int workerCountOf(int c) { return c & CAPACITY; }
//是否正在執行
private static boolean isRunning(int c) { return c < SHUTDOWN;}
以上程式碼中的資訊整理如下:
- clt是一個AtomicInteger物件,(提供原子操作進行Integer的使用,適用於高併發場景.該AtomicInteger的value可以自動重新整理,確保在高併發環境下的唯一性.),而ctl.get()獲取的就是該value值.
- 執行緒池用一個AtomicInteger來儲存 [執行緒數量] 和 [執行緒池狀態] ,一個int數值一共有32位,高3位用於儲存執行狀態,低29位用於儲存執行緒數量
- 系統預設的執行緒容量就是(2^29)-1 , 大約5億條執行緒-_-!
所以由此得知 :
頻繁的呼叫c = ctl.get();
是為了獲取該AtomicInteger的最新值,進而通過位運算獲取執行緒池的最新執行狀態,執行緒數量.
[執行緒池狀態]:
- RUNNING: 接收新任務,並執行佇列中的任務
- SHUTDOWN: 不接收新任務,但是執行佇列中的任務
- STOP: 不接收新任務,不執行佇列中的任務,中斷正在執行中的任務
- TIDYING: 所有的任務都已結束,執行緒數量為0,處於該狀態的執行緒池即將呼叫terminated()方法
- TERMINATED: terminated()方法執行完成
新執行緒的建立
在execute()方法中獲知通過addWorker()方法來新增新執行緒,那麼到底是如何新增和管理的?
開始追原始碼,一看究竟.
/**
* 往執行緒池中新增Worker物件
* @param firstTask 執行緒中第一個要執行的任務
* @param core 是否為核心執行緒
* @return 新增是否成功
*/
private boolean addWorker(Runnable firstTask, boolean core) {
//這裡有兩層[死迴圈],外迴圈:不停的判斷執行緒池的狀態
retry: for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//一系列判斷條件:執行緒池關閉,Runnable為空,佇列為空,則直接return false,代表Runnable新增失敗
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())){
return false;
}
//內迴圈:不停的檢查執行緒容量
for (;;) {
int wc = workerCountOf(c);
//超過執行緒數限制,則return false
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)){
return false;
}
//★ 新增執行緒成功,則直接跳出兩層迴圈,繼續往下執行.
//注意:這裡只是把執行緒數成功新增到了AtomicInteger記錄的執行緒池數量中,真正的Runnable新增,在下面的程式碼中進行
if (compareAndIncrementWorkerCount(c)){
break retry;
}
//再次判斷執行緒池最新狀態,如果狀態改變了(內迴圈和外迴圈記錄的狀態不符),則重新開始外層死迴圈
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs){
continue retry;
}
}
}
//結束迴圈之後,開始真正的建立執行緒.
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//建立一個Worker物件,並將Runnable當做引數傳入
w = new Worker(firstTask);
//從worker物件中取出執行緒
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//拿到鎖
mainLock.lock();
try {
//再次檢查執行緒池最新狀態
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
//檢查準備執行Runnable的Thread的狀態,如果該Thread已處於啟動狀態,則丟擲狀態異常(因為目前還沒啟動呢)
if (t.isAlive()){
throw new IllegalThreadStateException();
}
//將新建立的worker,新增到worker集合
workers.add(w);
...
workerAdded = true;
}
} finally {
//釋放鎖
mainLock.unlock();
}
if (workerAdded) {
//★Thread開始啟動
t.start();
workerStarted = true;
}
}
} finally {
//新增worker失敗
if (! workerStarted){
addWorkerFailed(w);
}
}
return workerStarted;
}
總結:
1. 先判斷執行緒池狀態和執行緒池中執行緒的容量,如果滿足執行緒新增的條件,則先把AtomicInteger中記錄的執行緒數量+1.然後再進行執行緒新增的工作.
2. 建立worker物件,並將Runnable作為引數傳遞進去,並從worker中取出Thread物件,進行一系列條件判斷後.開啟Thread的start()方法,執行緒開始執行.所以worker物件中必然包含了一個Thread和一個要被執行的Runnable.
那麼接下來繼續追原始碼,印證下第二點的推斷,看看Worker到底幹了什麼.
Worker類
//ThreadPoolExecutor的內部finial類
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
//當前worker要執行任務所用的執行緒(如果建立失敗,則可能是null)
final Thread thread;
//第一個要執行的任務(可能是null)
Runnable firstTask;
//當前執行緒執行完的任務總數
volatile long completedTasks;
//通過構造傳入Runnable任務
Worker(Runnable firstTask) {
...
this.firstTask = firstTask;
//通過ThreadFactory()建立新執行緒
this.thread = getThreadFactory().newThread(this);
}
//呼叫外部類runWorker()方法
public void run() {
runWorker(this);
}
...
}
worker類中的內部實現也印證了我們的推斷:
- 每個worker,都是一條執行緒,同時裡面包含了一個firstTask,即初始化時要被首先執行的任務.
- 最終執行任務的,是runWorker()方法
執行緒的複用
繼續追runWorker()方法的原始碼
//ThreadPoolExecutor的final類,該方法由內部類Worker的run()方法呼叫
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//取出Worker物件中的Runnable任務
Runnable task = w.firstTask;
boolean completedAbruptly = true;
...
try {
//★注意這個while迴圈,在這裡實現了 [執行緒複用]
while (task != null || (task = getTask()) != null) {
//上鎖
w.lock();
//檢查Thread狀態的程式碼
...
try {
...
try {
//執行Worker中的Runnable任務
task.run();
} catch (...) {
...catch各種異常
}
} finally {
//置空任務(這樣下次迴圈開始時,task依然為null,需要再通過getTask()取) + 記錄該Worker完成任務數量 + 解鎖
task = null;
w.completedTasks++;
w.unlock();
}
}
//該執行緒已經從佇列中取不到任務了,改變標記
completedAbruptly = false;
} finally {
//執行緒移除
processWorkerExit(w, completedAbruptly);
}
}
通過上面的原始碼,發現通過一個while迴圈,不斷的getTask()取任務出來執行,以這種方式實現了執行緒的複用.
執行緒複用邏輯整理如下:
1. 如果task不為空,則開始執行task
2. 如果task為空,則通過getTask()再去取任務,並賦值給task,如果取到的Runnable不為空,則執行該任務
3. 執行完畢後,通過while迴圈繼續getTask()取任務
4. 如果getTask()取到的任務依然是空,那麼整個runWorker()方法執行完畢
上面只是從getTask()方法名和其返回值來猜測此方法的作用,下面就繼續追原始碼,來證實和研究getTask()到底是怎麼取任務的,從哪取,怎麼取.
getTask()
private Runnable getTask() {
...
for (;;) {
...
// 如果執行緒池已關閉 或 任務佇列為空,則AtomicInteger中記錄的執行緒數量-1,並return null,結束本方法
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//獲取當前執行緒池中的匯流排程數
int wc = workerCountOf(c);
//allowCoreThreadTimeOut引數是使用者自行設定的(預設false),用來設定:是否允許核心執行緒有超時策略
//條件1:核心執行緒超時 條件2:當前執行緒數 > 核心執行緒數,滿足任何一個條件則timed標記為true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//超過最大執行緒數 或 超時 或 任務佇列為空... 執行緒數量-1 + return null
...
try {
//根據timed標記,使用不同的方式(限時等待 or 阻塞)從BlockingQueue<Runnable> workQueue 佇列中取任務
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null){
//如果取到了,就將Runnable返回
return r;
}
//如果沒取到,則重新for迴圈
...
}
}
}
將以上原始碼中的資訊整理如下:
- 執行緒池使用BlockingQueue來管理整個執行緒池中的Runnable任務,變數workQueue存放的都是待執行的任務
- BlockingQueue是個阻塞佇列,BlockingQueue.take()方法如果得到的是空,則進入等待狀態,直到BlockingQueue有新的物件被加入時,才可以正常將Runnable取出並返回,執行緒開始正常運轉,正常執行Runnable任務。
/**
* 先進先出的阻塞佇列
*/
public interface BlockingQueue<E> extends Queue<E> {
/**
* 檢索並移除佇列的頂部元素,如果該元素不可用則等待,直至元素可用
* Retrieves and removes the head of this queue, waiting if necessary
* until an element becomes available.
*
* @return the head of this queue
* @throws InterruptedException if interrupted while waiting
*/
E take()
...
}
讓我們整理一下上面幾段原始碼的邏輯順序:
1. execute()方法執行之後,進行一系列的邏輯判斷來控制執行緒池中的執行緒數量,並通過addWorker()方法建立新執行緒
2. 一旦Worker裡的Thread開始start()之後,執行的其實是Worker裡的run()方法,run()方法呼叫runWorker(Worker w)方法.
3. 在runWorker()方法裡面通過getTask()方法不停的取workQueue佇列中的任務來執行,如果取到了就執行,如果沒取到就等待.
結論:
- 一旦一個執行緒開啟之後,會一直執行下去,直至任務佇列中的任務執行完畢,達成了執行緒的複用
- 以Runnable佇列為目標的worker雖然是序列操作,但是由於可以通過addWorker()新增多個worker,並且多個worker取的是同一個BlockingQueue中的Runnable,所以就實現了並行處理.
執行緒的移除
在runWorker()方法中有如下程式碼:
final void runWorker(Worker w) {
boolean completedAbruptly = true;
...
try {
while (getTask()...) {
...
處理任務
}
//該執行緒已經從佇列中取不到任務了,改變標記,該標記表示:該執行緒是否因使用者因素導致的異常而終止
completedAbruptly = false;
} finally {
//執行緒移除
processWorkerExit(w, completedAbruptly);
}
}
processWorkerExit這裡用來將worker從worker集合中移除,步驟如下:
1. 先移除傳入的Worker(執行緒)
2. 判斷執行緒池裡的最少執行緒數,如果最少執行緒數為0條,但是佇列裡依然有任務未執行完畢.那麼必須確保執行緒池中至少有1條執行緒.(將最小執行緒數置為1)
3. 如果當前執行緒數 > 最小執行緒數,本方法結束,不再往下執行
4. 否則新增一條新執行緒,來替代當前執行緒,繼續去執行佇列中的任務.
/**
* @param w the worker 執行緒
* @param completedAbruptly 該執行緒是否因使用者因素導致的異常而終止
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
...
try {
//記錄該執行緒完成任務的總數
completedTaskCount += w.completedTasks;
//從worker集合中移除本worker(執行緒)
workers.remove(w);
}
...
//如果在runWoker()中正常執行任務完畢,這裡completedAbruptly傳入的就是false
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果執行緒池裡最少執行緒數為0,但是此時任務佇列裡依然還有任務
if (min == 0 && ! workQueue.isEmpty()){
//那麼必須保留一條執行緒,所以將最小值設定為1
min = 1;
}
//如果當前執行緒數>= 最小執行緒數,則直接return
if (workerCountOf(c) >= min){
return;
}
}
//否則新增一條新執行緒,來替代當前執行緒,繼續去執行佇列中的任務.
addWorker(null, false);
}
這次原始碼分析就先到這裡,一路從execute()開始,走到執行緒移除.其實執行緒池裡面涉及到的問題很多,以後有時間再慢慢研究.