1. 程式人生 > 實用技巧 >從原始碼看執行緒池執行任務的流程及原理

從原始碼看執行緒池執行任務的流程及原理

1. 執行緒池的必要性與核心引數

頻繁建立、銷燬執行緒的開銷過大,所以建立執行緒頻率高的場景一般都選擇使用執行緒池,例如tomcat與客戶端通訊時處理髮來的請求。建立執行緒池的引數如下

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
                            TimeUnit unit,BlockingQueue<Runnable> workQueue,
                            ThreadFactory threadFactory,RejectedExecutionHandler handler)
  • corePoolSize:池中核心執行緒數量,執行緒數量不大於corePoolSize時執行緒不會因為空閒時間超過閾值而被銷燬。
  • maximumPoolSize:最大執行緒數,當執行緒數量大於corePoolSize而小於maximumPoolSize時,如果一個執行緒空閒時間超過閾值,則會被回收。
  • keepAliveTime:非核心執行緒空閒的最大時間,超過這個值就會被回收。
  • unit:keepAliveTime引數的時間單位。
  • workQueue:用來暫存任務的佇列。
  • threadFactory:由於提交到執行緒池的是任務,需要包裝成執行緒來執行,threadFactory就是用來生產執行緒的。
  • handler:當執行緒池由於各種原因不接受一個任務時,使用這個物件的拒絕方法拒絕,不同的實現類的拒絕策略不同,直接拋異常/重試/無視

根據不同的引數搭配,建立的執行緒池適應不同的場景,Executors類可以生成幾種典型的執行緒池:

  • 固定執行緒數的執行緒池:corePoolSize、maximumPoolSize設定成一樣,並使用無界的阻塞佇列。這樣執行緒池數量從0增加到corePoolSize後就一直保持這個數量,再有任務來時直接加入阻塞佇列,反正阻塞佇列是無界限的。這種執行緒池適合併發量較大且波動不大的場景,但是要注意佇列過大佔用資源的問題。
  • 單執行緒執行緒池:固定執行緒數執行緒池的特殊場景,corePoolSize、maximumPoolSize都設為1即可。適合併發量穩定保持在低水平的場景,也要注意阻塞佇列過大的問題。
  • 直接提交執行緒池:corePoolSize設為0,maximumPoolSize設為最大值,阻塞佇列是一個直接提交的佇列,對於新來的任務每次都建立執行緒處理。適合併發量很難確定的場景。
2. 執行任務的流程
  • 接收到任務後的主流程
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //獲取執行緒池狀態及當前執行緒數記錄,高3位記錄狀態,低29位記錄執行緒數
        int c = ctl.get();   
        //如果執行緒數小於核心執行緒閾值
        if (workerCountOf(c) < corePoolSize) {
            //直接包裝成Worker執行(true表示將任務以核心執行緒身份包裝)
            if (addWorker(command, true))   
                return;
            c = ctl.get();   //如果上一步失敗,則獲取最新的狀態
        }
        //如果執行緒池還在執行,則嘗試將任務加入阻塞佇列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();   //加入成功後再檢查執行緒池狀態
            //如果加入後執行緒池沒有在執行,將任務從佇列中移除
            if (! isRunning(recheck) && remove(command))  
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果前面加入工作佇列失敗,則以非核心執行緒身份包裝任務(false標誌)
        else if (!addWorker(command, false))
            reject(command);  //如果上一步也失敗,則拒絕這個任務
    }

總結:(1)來了新任務,先檢視核心執行緒數是否已經達到,未達到則直接以核心執行緒的方式新建執行緒執行任務;(2)如果執行緒數已經達到核心閾值則將任務加入阻塞佇列;(3)如果加入佇列失敗則以非核心的方式建立執行緒執行任務;(4)還是失敗則拒絕這個任務

  • 新建執行緒執行任務:addWorker(Runnable firstTask, boolean core)的操作
private boolean addWorker(Runnable firstTask, boolean core) {
        /**
        此處省略一段程式碼,大致工作是檢查當前執行緒數量是否小於閾值(否就返回false),
        CAS的檢查執行緒池狀態和CAS的將執行緒數+1。
        */
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
 //根據任務包裝Worker,Worker物件有一個執行緒物件也在此時根據Worker物件生成,Worker實現了Runnable介面
            w = new Worker(firstTask);   
            //獲取Worker裡生成的執行緒物件
            final Thread t = w.thread;   
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    //獲取執行緒狀態
                    int rs = runStateOf(ctl.get());
                    //檢查執行緒池狀態的合法性
                    if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // 如果此刻執行緒就已經活躍,那麼就是出錯了
                            throw new IllegalThreadStateException();
                        workers.add(w);      //將生成的Worker物件放到Set集合
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;  //記錄Worker已經成功新增到集合
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();     //如果Worker物件已經成功新增,則啟動這個物件的執行緒
                    workerStarted = true;  //記錄任務已經開始執行
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);   //如果最終發現任務沒有標誌為開始,則呼叫響應處理方法
        }
        return workerStarted;
    }

這裡要注意Worker本身實現了Ruunable介面有一個run方法,Worker物件持有提交來的task和一個執行緒物件,這個執行緒物件根據Worker物件生成所以執行的是Worker的run方法,而Worker的run方法裡又是呼叫了task的run方法。其實就是一個代理模式:Worker物件代理了提交的task。

所以現在知道了t.start()後會執行Worker的run方法,下面看Worker的run方法:

 public void run() {
            runWorker(this);  //呼叫另一個runWorker方法,並將自己傳入
        }
  • 真正執行任務的方法:runWorker(Worker w)
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;  //首先拿到提交的那個task
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //核心操作就是迴圈的getTask(),從阻塞佇列裡拿task來執行
            while (task != null || (task = getTask()) != null) {
                w.lock();
                //檢查執行緒池狀態
                if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())
                    wt.interrupt();  //不合法就中斷執行此任務的執行緒
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();    //執行真實角色的run方法
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

可以看到,使用Worker的run方法作為執行緒的執行方法代理task的run方法,主要操作是在執行完task的run方法後再getTask()去佇列裡獲取新任務繼續執行。