1. 程式人生 > 實用技巧 >java多執行緒:執行緒池原理、阻塞佇列

java多執行緒:執行緒池原理、阻塞佇列


一、執行緒池定義和使用


jdk 1.5 之後就引入了執行緒池。

1.1 定義


從上面的空間切換看得出來,執行緒是稀缺資源,它的建立與銷燬是一個相對偏重且耗資源的操作,而Java執行緒依賴於核心執行緒,建立執行緒需要進行作業系統狀態切換。為避免資源過度消耗需要設法重用執行緒執行多個任務。執行緒池就是一個執行緒快取,負責對執行緒進行統一分配、調優與監控。(資料庫連線池也是一樣的道理)

什麼時候使用執行緒池?

單個任務處理時間比較短;需要處理的任務數量很大。

執行緒池優勢?

  • 重用存在的執行緒,減少執行緒建立、消亡的開銷,提高效能、提高響應速度。
  • 當任務到達時,任務可以不需要等到執行緒建立就能立即執行。
  • 提高執行緒的可管理性,可統一分配,調優和監控。

1.2 執行緒池在 jdk 已有的實現


  • 在 juc 包下,有一個介面:Executor :
  • Executor 又有兩個子介面:ExecutorService 和 ScheduledExecutorService,常用的介面是 ExecutorService。
  • 同時常用的執行緒池的工具類叫 Executors。

例如:

ExecutorService service = Executors.newCachedThreadPool();

Executor 框架雖然提供瞭如 newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()、newScheduledThreadPool() 等建立執行緒池的方法,但都有其侷限性,不夠靈活。

上面的幾種方式點進去會發現,都是用 ThreadPoolExecutor 進行建立的:

  1. newSingleThreadExecutor 字面意思簡單執行緒執行器
2. newFixedThreadPool 字面意思**固定的執行緒池**,傳參就是執行緒固定數目,適用於執行長期任務的場景。
3. newCachedThreadPool 字面意思**快取執行緒池**,核心執行緒0,最大執行緒非常大,動態建立的特點。
4. newScheduledThreadPool 字面意思**時間安排執行緒池**,指定核心執行緒數。
5. newSingleThreadScheduledExecutor 字面意思**單執行緒安排執行器**,也就是基於只有一個核心執行緒的執行器之外,又可以擴充套件。其中又用 DelegatedExecutorService 委託執行器服務進行了包裝。
可以看到,上面直接用 Executors 工具類預設的一些實現 new 出來的執行緒池都是用的 ThreadPoolExecutor 執行緒執行器這個類進行構造的,不過引數不同,導致了效果的側重點不同。

因此,自己建立執行緒池推薦的方法就是,直接使用 ThreadPoolExecutor 進行個性化的建立:

構造方法種的引數有 7 個:

  • corePoolSize:執行緒池維護執行緒的最少數量 (core : 核心
  • maximumPoolSize:執行緒池維護執行緒的最大數量,顯然必須>=1
  • keepAliveTime:執行緒池維護的多餘的執行緒所允許的空閒時間,最長可以空閒多久,時間到了,如果超過 corePoolSize 的執行緒一直空閒,他們就會被銷燬。
  • unit:執行緒池維護執行緒所允許的空閒時間的單位
  • workQueue:執行緒池所使用的緩衝佇列,已經提交但是沒有執行的任務會放進這裡
  • threadFactory:生成執行緒池種工作執行緒的執行緒工廠,一般使用預設
  • handler:執行緒池對拒絕任務的處理策略,當佇列滿且工作執行緒已經達到maximumPoolSize。

阿里的 java 開發手冊,強制要求,通過 ThreadPoolExecutor 來自定義,不能使用內建的,避免資源耗盡。這個很好理解,1 的型別就只有一個核心執行緒和最大現場,2 沒有擴充套件性,3、4、5的最大執行緒數太大,記憶體會爆炸。

1.3 執行緒池使用方法


這裡我們用固定執行緒池來測試,傳入核心執行緒數為 5,最大數量自然就也是 5,

public static void main(String[] args) {
    ExecutorService threadPool = Executors.newFixedThreadPool(5);
    try {
        //模擬10個顧客辦理業務
        for (int i = 0; i < 10; i++){
            //execute 執行方法,傳入引數為實現了 Runnable 介面的類
            threadPool.execute(()->{
                System.out.println(Thread.currentThread().getName()+"號執行緒辦理業務");
            });
        }
    } catch (Exception e){
        e.printStackTrace();
    } finally {
        threadPool.shutdown();
    }
}

其中,execute 方法就是將任務提交的方法,我們用 lambda 表示式給 execute 方法傳入了引數,實際上相當於一個完整的實現了 Runnable 介面的類。

執行結果:

可以看到,我們迴圈了 10 次,執行任務,但是執行緒只用到了 1-5 ,其中有多次複用

再比如,我們按照各種型別的執行緒池,自己定義一個執行緒池,核心執行緒數 2, 最大執行緒數 5,阻塞佇列長度為 3:

public static void main(String[] args) {
    ExecutorService threadPool = new ThreadPoolExecutor(
            2,
            5,
            2L,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(3),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );
    
    try {
        //模擬10個顧客辦理業務
        for (int i = 0; i < 10; i++){
            //execute 執行方法,傳入引數為實現了 Runnable 介面的類
            threadPool.execute(()->{
                System.out.println(Thread.currentThread().getName()+"號執行緒辦理業務");
            });
        }
    } catch (Exception e){
        e.printStackTrace();
    } finally {
        threadPool.shutdown();
    }
}

同樣 10 個執行緒,執行起來:

可以看到,執行了 8 個任務後,就丟擲了異常,說明執行了拒絕策略

上面兩個示例,我們的任務本身都是沒有返回值的,如果建立的任務本身需要有返回值就需要實現 Callable 介面,然後搭配FutureTask 來傳入任務,那麼執行緒池就應該呼叫 submit 方法而不是 execute。


二、執行緒池底層原理


2.1 執行緒池執行邏輯


處理的流程核心就 execute() 方法,他接收一個實現了 Runnable 介面的任務,決定對這個任務的處理策略。

下圖是一個比較形象的策略流程:

可能的情況有四種,也就是圖中的1234:

  1. 如果執行緒池中的執行緒數量少於corePoolSize,就建立新的核心執行緒來執行新新增的任務
  2. 如果執行緒池中的執行緒數量大於等於corePoolSize,但佇列workQueue未滿,則將新新增的任務放到佇列workQueue中
  3. 如果執行緒池中的執行緒數量大於等於corePoolSize,且佇列workQueue已滿,但執行緒池中的執行緒數量小於maximumPoolSize,則會建立新的非核心執行緒來處理被新增的任務
  4. 如果執行緒池中的執行緒數量等於了maximumPoolSize,就用RejectedExecutionHandler來執行拒絕策略。會丟擲異常,一般的拒絕策略是RejectedExecutionException

注意,執行的順序,在 java 裡有一個不合理的地方:
在池裡安排任務的時候,我們的核心執行緒,佇列,非核心執行緒裡面排的任務順序應該是 1 2 3;

但是真正實現上,如果三個都滿了,開始執行的時候,依次執行的順序卻是 核心執行緒,非核心執行緒,佇列。也就是執行順序會變成 1 3 2

2.2 拒絕策略


有些時候,我們並不希望拒絕策略是直接丟擲異常,那麼 jdk 裡面提供的預設拒絕策略有 4 種,他們體現在程式碼中就是 ThreadPoolExecutor 的四個靜態內部類:

2.2.1 CallerRunsPolicy:呼叫者執行策略。

這種策略不會拋棄任務,也不丟擲異常,而是將某些任務回退給呼叫者,從而降低新任務的流量。

實現非常簡單,那就是如果說 e 這個執行緒池已經 shutdown 了,那麼就什麼也不幹,也就是這個任務直接丟了;否則,r.run() ,相當於呼叫這個方法的執行緒裡直接執行了這個 Runnable 任務。

此時我們可以把 1.3 裡的程式碼修改一下,只修改策略為
CallerRunsPolicy:

可以看到,有些任務會在 main 執行緒裡處理。

2.2.2 AbortPolicy:終止策略。

拋異常。前面已經試過了,這個是預設的拒絕策略。

2.2.3 DiscardPolicy:丟棄任務。

可以看到,原始碼裡就是是什麼也不做。如果場景中允許任務丟失,這個是最好的策略。

2.2.4 DiscardOldestPolicy:拋棄佇列中等待最久的任務。

拋棄佇列中等待最久的任務,然後把當前的任務加入佇列中,嘗試再次提交當前任務。

原始碼裡也就是利用佇列操作,進行一次出隊操作,然後重新呼叫 execute 方法。

2.3 執行緒池的五種狀態


一個正常的執行緒的生命週期區別開,這個是執行緒池裡執行緒的狀態。

  1. Running,能接受新任務以及處理已新增的任務;
  2. Shutdown,不接受新任務,可以處理已經新增的任務,也就是不能再呼叫execute或者submit了;
  3. Stop,不接受新任務,不處理已經新增的任務,並且中斷正在處理的任務;
  4. Tidying,所有的任務已經終止,CTL記錄的任務數量為0,CTL負責記錄執行緒池的執行狀態與活動執行緒數量;
  5. Terminated,執行緒池徹底終止,則執行緒池轉變為terminated的狀態。

如圖所示,從running狀態轉換為 shutdown,呼叫 shutdown()方法;如果呼叫shutdownNow()方法,就直接會變成stop。

terminated()是鉤子函式,預設是什麼也不做的,我們可以重寫,然後決定結束之前要做一些別的處理邏輯。這個鉤子函式,就是模板模式的方法。

三、阻塞佇列


執行緒池裡的 BlockingQueue,阻塞佇列,事實上在消費者生產者問題裡的管程法實現,我們的策略也是類似阻塞佇列的,用它來做一個快取池的作用。

阻塞佇列:任意時刻,不管併發有多高,永遠保證只有一個執行緒能夠進行佇列的入隊或出隊操作。也就意味著他是能夠保證執行緒安全的。

另外,阻塞佇列分為有界和無界佇列,理論上來說一個是佇列的size有固定,另一個是無界的。對於有界佇列來說,如果佇列存滿,只能出隊了,入隊操作就只能阻塞。

在 juc 包裡,阻塞佇列的實現有很多:

  1. ArrayBlockingQueue:有界阻塞佇列;
  2. LinkedBlockingQueue:連結串列結構(大小預設值為Integer.MAX_VALUE)的阻塞佇列;
  3. PriorityBlockingQueue:支援優先順序排序的無界阻塞佇列;
  4. DelayQueue:使用優先順序佇列實現的延遲無界阻塞佇列;
  5. SynchronousQueue:不儲存元素的阻塞佇列,相當於只有一個元素;
  6. LinkedTransferQueue:連結串列組成的無界阻塞佇列;
  7. LinkedBlockingDeque:連結串列組成的雙向阻塞佇列。

對於 BlockingQueue 來說,核心操作主要有幾類:插入、刪除、查詢。

其中的四種異常策略:

  • 拋異常:如果阻塞佇列滿,再往佇列裡 add 插入元素會拋 IllegalStateException:Queue full,如果阻塞佇列空,再 remove 就會拋 NoSuchElementException。
  • 特殊值:offer 方法:成功 true,失敗 false,poll 方法,成功就返回元素,沒有就返回 null。
  • 阻塞:阻塞佇列滿的時候,生產者執行緒繼續 put 元素,佇列就會阻塞直到可以 put 資料或者響應中斷然後退出,阻塞佇列空的時候,消費者執行緒繼續 take 元素,佇列就會一直阻塞直到有元素可以 take。
  • 超時退出:阻塞佇列滿的時候,會阻塞生產者執行緒且超時退出,空的時候會阻塞消費者執行緒且超時退出。

那麼使用的時候,增刪的方法按對應的同一組使用比較合理。(其實這個策略的設計對應的在單執行緒集合裡也有,那就是Deque介面的實現類 LinkedList 使用的時候,不同的增刪方法策略不同)