面試官:小夥子,你給我說一下執行緒池的執行緒複用原理吧
前言
前兩天和粉絲聊天的時候,粉絲問了我一個挺有意思的問題,說他之前在面試的時候被問到執行緒池的執行緒複用原理,當時我跟他簡單的說了一下,沒想到過了幾天又來問我這個問題了,說他最近又被問到了這個問題.......想了想,乾脆寫篇文章把這個東西講清楚吧,滿滿的乾貨都放在下面了
1.什麼是執行緒複用?
線上程池中,通過同一個執行緒去執行不同的任務,這就是執行緒複用。
假設現在有 100 個任務,我們建立一個固定執行緒的執行緒池(FixedThreadPool),核心執行緒數和最大執行緒數都是 3,那麼當這個 100 個任務執行完,都只會使用三個執行緒。
示例:
public class FixedThreadPoolDemo { static ExecutorService executorService = Executors.newFixedThreadPool(3); public static void main(String[] args) { for (int i = 0; i < 100; i++) { executorService.execute(() -> { System.out.println(Thread.currentThread().getName() + "-> 執行"); }); } // 關閉執行緒池 executorService.shutdown(); } }
執行結果:
pool-1-thread-1-> 執行 pool-1-thread-2-> 執行 pool-1-thread-3-> 執行 pool-1-thread-1-> 執行 pool-1-thread-3-> 執行 pool-1-thread-2-> 執行 pool-1-thread-3-> 執行 pool-1-thread-1-> 執行 ...
2.執行緒複用的原理
執行緒池將執行緒和任務進行解耦,執行緒是執行緒,任務是任務,擺脫了之前通過 Thread 建立執行緒時的一個執行緒必須對應一個任務的限制。
線上程池中,同一個執行緒可以從阻塞佇列中不斷獲取新任務來執行,其核心原理在於執行緒池對 Thread 進行了封裝,並不是每次執行任務都會呼叫 Thread.start() 來建立新執行緒,而是讓每個執行緒去執行一個“迴圈任務”,在這個“迴圈任務”中不停的檢查是否有任務需要被執行,如果有則直接執行,也就是呼叫任務中的 run 方法,將 run 方法當成一個普通的方法執行,通過這種方式將只使用固定的執行緒就將所有任務的 run 方法串聯起來。
3.執行緒池執行流程
這部分內容在 Java 執行緒池的各個引數的含義 討論過,這裡我們再複習一次,再從中去了解執行緒複用。
3.1 流程圖
3.2 執行緒建立的流程
當任務提交之後,執行緒池首先會檢查當前執行緒數,如果當前的執行緒數小於核心執行緒數(corePoolSize),比如最開始建立的時候執行緒數為 0,則新建執行緒並執行任務。
當提交的任務不斷增加,建立的執行緒數等於核心執行緒數(corePoolSize),新增的任務會被新增到 workQueue 任務佇列中,等待核心執行緒執行完當前任務後,重新從 workQueue 中獲取任務執行。
假設任務非常多,達到了 workQueue 的最大容量,但是當前執行緒數小於最大執行緒數(maximumPoolSize),執行緒池會在核心執行緒數(corePoolSize)的基礎上繼續建立執行緒來執行任務。
在任務不斷增加的過程中,執行緒池會逐一進行以下 4 個方面的判斷
核心執行緒數(corePoolSize)
任務佇列(workQueue)
最大執行緒數(maximumPoolSize)
拒絕策略
3.3 ThreadPoolExecutor#execute 原始碼分析
java.util.concurrent.ThreadPoolExecutor#execute
public void execute(Runnable command) { // 如果傳入的Runnable的空,就丟擲異常 if (command == null) throw new NullPointerException(); int c = ctl.get(); // 執行緒池中的執行緒比核心執行緒數少 if (workerCountOf(c) < corePoolSize) { // 新建一個核心執行緒執行任務 if (addWorker(command, true)) return; c = ctl.get(); } // 核心執行緒已滿,但是任務佇列未滿,新增到佇列中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 任務成功新增到佇列以後,再次檢查是否需要新增新的執行緒,因為已存在的執行緒可能被銷燬了 if (! isRunning(recheck) && remove(command)) // 如果執行緒池處於非執行狀態,並且把當前的任務從任務佇列中移除成功,則拒絕該任務 reject(command); else if (workerCountOf(recheck) == 0) // 如果之前的執行緒已經被銷燬完,新建一個非核心執行緒 addWorker(null, false); } // 核心執行緒池已滿,佇列已滿,嘗試建立一個非核心新的執行緒 else if (!addWorker(command, false)) // 如果建立新執行緒失敗,說明執行緒池關閉或者執行緒池滿了,拒絕任務 reject(command); }
3.4 逐行分析
//如果傳入的Runnable的空,就丟擲異常 if (command == null) throw new NullPointerException();
execute 方法中通過 if 語句判斷 command ,也就是 Runnable 任務是否等於 null,如果為 null 就丟擲異常。
if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); }
判斷當前執行緒數是否小於核心執行緒數,如果小於核心執行緒數就呼叫 addWorker() 方法增加一個 Worker,這裡的 Worker 就可以理解為一個執行緒。
addWorker 方法的主要作用是線上程池中建立一個執行緒並執行傳入的任務,如果返回 true 代表新增成功,如果返回 false 代表新增失敗。
第一個引數表示傳入的任務
第二個引數是個布林值,如果布林值傳入 true 代表增加執行緒時判斷當前執行緒是否少於 corePoolSize,小於則增加新執行緒(核心執行緒),大於等於則不增加;同理,如果傳入 false 代表增加執行緒時判斷當前執行緒是否少於 maximumPoolSize,小於則增加新執行緒(非核心執行緒),大於等於則不增加,所以這裡的布林值的含義是以核心執行緒數為界限還是以最大執行緒數為界限進行是否新增非核心執行緒的判斷
這一段判斷相關原始碼如下
private boolean addWorker(Runnable firstTask, boolean core) { ... int wc = workerCountOf(c);//當前工作執行緒數 //判斷當前工作執行緒數>=最大執行緒數 或者 >=核心執行緒數(當core = true) if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; ...
最核心的就是 core ? corePoolSize : maximumPoolSize 這個三目運算。
// 核心執行緒已滿,但是任務佇列未滿,新增到佇列中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 任務成功新增到佇列以後,再次檢查是否需要新增新的執行緒,因為已存在的執行緒可能被銷燬了 if (! isRunning(recheck) && remove(command)) // 如果執行緒池處於非執行狀態,並且把當前的任務從任務佇列中移除成功,則拒絕該任務 reject(command); else if (workerCountOf(recheck) == 0) // 如果之前的執行緒已經被銷燬完,新建一個非核心執行緒 addWorker(null, false); }
如果程式碼執行到這裡,說明當前執行緒數大於或等於核心執行緒數或者 addWorker 失敗了,那麼就需要通過
if (isRunning(c) && workQueue.offer(command)) 檢查執行緒池狀態是否為 Running,如果執行緒池狀態是 Running 就通過 workQueue.offer(command) 將任務放入任務佇列中,
任務成功新增到佇列以後,再次檢查執行緒池狀態,如果執行緒池不處於 Running 狀態,說明執行緒池被關閉,那麼就移除剛剛新增到任務佇列中的任務,並執行拒絕策略,程式碼如下:
if (! isRunning(recheck) && remove(command)) // 如果執行緒池處於非執行狀態,並且把當前的任務從任務佇列中移除成功,則拒絕該任務 reject(command);
下面我們再來看後一個 else 分支:
else if (workerCountOf(recheck) == 0) // 如果之前的執行緒已經被銷燬完,新建一個非核心執行緒 addWorker(null, false);
進入這個 else 說明前面判斷到執行緒池狀態為 Running,那麼當任務被新增進來之後就需要防止沒有可執行執行緒的情況發生(比如之前的執行緒被回收了或意外終止了),所以此時如果檢查當前執行緒數為 0,也就是 workerCountOf(recheck) == 0,那就執行 addWorker() 方法新建一個非核心執行緒。
我們再來看最後一部分程式碼:
// 核心執行緒池已滿,佇列已滿,嘗試建立一個非核心新的執行緒 else if (!addWorker(command, false)) // 如果建立新執行緒失敗,說明執行緒池關閉或者執行緒池滿了,拒絕任務 reject(command);
執行到這裡,說明執行緒池不是 Running 狀態,又或者執行緒數 >= 核心執行緒數並且任務佇列已經滿了,根據規則,此時需要新增新執行緒,直到執行緒數達到“最大執行緒數”,所以此時就會再次呼叫 addWorker 方法並將第二個引數傳入 false,傳入 false 代表增加非核心執行緒。
addWorker 方法如果返回 true 代表新增成功,如果返回 false 代表任務新增失敗,說明當前執行緒數已經達到 maximumPoolSize,然後執行拒絕策略 reject 方法。
如果執行到這裡執行緒池的狀態不是 Running,那麼 addWorker 會失敗並返回 false,所以也會執行拒絕策略 reject 方法。
4.執行緒複用原始碼分析
java.util.concurrent.ThreadPoolExecutor#runWorker
省略掉部分和複用無關的程式碼之後,程式碼如下:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // 釋放鎖 設定work的state=0 允許中斷 boolean completedAbruptly = true; try { //一直執行 如果task不為空 或者 從佇列中獲取的task不為空 while (task != null || (task = getTask()) != null) { task.run();//執行task中的run方法 } } completedAbruptly = false; } finally { //1.將 worker 從陣列 workers 裡刪除掉 //2.根據布林值 allowCoreThreadTimeOut 來決定是否補充新的 Worker 進陣列 workers processWorkerExit(w, completedAbruptly); } }
可以看到,實現執行緒複用的邏輯主要在一個不停迴圈的 while 迴圈體中。
通過獲取 Worker 的 firstTask 或者通過 getTask 方法從 workQueue 中獲取待執行的任務
直接通過 task.run() 來執行具體的任務(而不是新建執行緒)
在這裡,我們找到了執行緒複用最終的實現,通過取 Worker 的 firstTask 或者 getTask 方法從 workQueue 中取出了新任務,並直接呼叫 Runnable 的 run 方法來執行任務,也就是如之前所說的,每個執行緒都始終在一個大迴圈中,反覆獲取任務,然後執行任務,從而實現了執行緒的複用。
總結
這篇關於執行緒池的執行緒複用原理的文章就到這裡了,大家看完有什麼不懂的歡迎在下方留言評論,也可以私信問我,我看到了一般都會回覆的!