1. 程式人生 > 實用技巧 >【併發程式設計】- 執行緒池篇

【併發程式設計】- 執行緒池篇

  • 簡介

  • 合理地使用執行緒池能夠帶來3個好處:
    • 降低資源消耗:通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。
    • 提高響應速度:當任務到達時,任務可以不需要等到執行緒建立就能立即執行。
    • 提高執行緒的可管理性:執行緒是稀缺資源,如果無限制地建立,不僅會消耗系統資源,還會降低系統的穩定 性,使用執行緒池可以進行統一分配、調優和監控。

  • 實現原理

  • 流程

    • 執行緒池判斷核心執行緒池是否都在執行任務。如果不是,則建立一個新的工作執行緒來執行任務。如果核心執行緒池裡的執行緒都在執行任務,則進行下個流程。
    • 執行緒池判斷工作佇列是否已經滿。如果工作佇列沒有滿,則將新提交的任務儲存在這個工作佇列列。如果工作佇列滿了,則進行下個流程。
    • 執行緒池判斷執行緒池的執行緒是否都處於工作狀態。如果沒有,則建立一個新的工作執行緒來執行任務。如果已經滿了,則交給飽和策略來處理這個任務。

執行緒池的主要處理流程

ThreadPoolExecutor執行execute()方法的示意圖


  • 使用

  • 實現

  • 執行緒池中的執行緒執行任務分兩種情況

    • execute()方法中建立一個執行緒時,會讓這個執行緒執行當前任務。
    • 這個執行緒執行完上圖中1的任務後,會反覆從BlockingQueue獲取任務來執行。
  • ThreadPoolExecutor的引數說明

    • corePoolSize(執行緒池的基本大小):當提交一個任務到執行緒池時,執行緒池會建立一個執行緒來執行任務,即使其他空閒的基本執行緒能夠執行新任務也會建立執行緒,等到需要執行的任務數大於執行緒池基本大小時就不再建立。如果呼叫了執行緒池的prestartAllCoreThreads()
      方法,執行緒池會提前建立並啟動所有基本執行緒。
  • runnableTaskQueue(任務佇列):用於儲存等待執行的任務的阻塞佇列。可以選擇以下幾個阻塞佇列。

    • ArrayBlockingQueue:是一個基於陣列結構的有界阻塞佇列,此佇列按FIFO(先進先出)原則對元素進行排序。
    • LinkedBlockingQueue:一個基於連結串列結構的阻塞佇列,此佇列按FIFO排序元素,吞吐量通常要高於ArrayBlockingQueue(ArrayBlockingQueue內部共用一個鎖,LinkedBlockingQueue內部使用兩個鎖,新增和刪除元素不是互斥的)。靜態工廠方法Executors.newFixedThreadPool()
      使用了這個佇列。
    • SynchronousQueue:一個不儲存元素的阻塞佇列。每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於Linked-BlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個佇列。
    • PriorityBlockingQueue:一個具有優先順序的無限阻塞佇列。
  • maximumPoolSize(執行緒池最大數量):執行緒池允許建立的最大執行緒數。如果佇列滿了,並且已建立的執行緒數小於最大執行緒數,則執行緒池會再建立新的執行緒執行任務。值得注意的是,如果使用了無界的任務佇列這個引數就沒什麼效果。

  • ThreadFactory:用於設定建立執行緒的工廠,可以通過執行緒工廠給每個創建出來的執行緒設定更有意義的名字。

  • RejectedExecutionHandler(飽和策略):當佇列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。JDK 1.5中Java執行緒池框架提供了以下4種策略。

    • AbortPolicy:直接丟擲異常。(預設)
    • CallerRunsPolicy:只用呼叫者所線上程來執行任務。
    • DiscardOldestPolicy:丟棄佇列裡最近的一個任務,並執行當前任務。
    • DiscardPolicy:不處理,丟棄掉。
  • keepAliveTime(執行緒活動保持時間):執行緒池的工作執行緒空閒後,保持存活的時間。所以,如果任務很多,並且每個任務執行的時間比較短,可以調大時間,提高執行緒的利用率。

  • TimeUnit(執行緒活動保持時間的單位):可選的單位有天(DAYS)、小時(HOURS)、分鐘(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和納秒(NANOSECONDS,千分之一微秒)。

  • 向執行緒池提交任務

  • execute():用於提交不需要返回值的任務,所以無法判斷任務是否被執行緒池執行成功。

    threadsPool.execute(new Runnable() {
        @Override
        public void run() {
            // TODO Auto-generated method stub
        }
    });
  • submit():用於提交需要返回值的任務。執行緒池會返回一個future型別的物件,通過這個future物件可以判斷任務是否執行成功,並且可以通過futureget()方法來獲取返回值,get()方法會阻塞當前執行緒直到任務完成,而使用get(long timeout,TimeUnit unit)方法則會阻塞當前執行緒一段時間後立即返回,這時候有可能任務沒有執行完。
Future<Object> future = executor.submit(harReturnValuetask);
                try {
                        Object s = future.get();
                } catch (InterruptedException e) {
                        // 處理中斷異常
                } catch (ExecutionException e) {
                        // 處理無法執行任務異常
                } finally {
                        // 關閉執行緒池
                        executor.shutdown();
                }
  • 關閉執行緒池

  • 可以通過呼叫執行緒池的shutdownshutdownNow方法來關閉執行緒池。
    • 原理是遍歷執行緒池中的工作執行緒,然後逐個呼叫執行緒的interrupt方法來中斷執行緒,所以無法響應中斷的任務可能永遠無法終止。

shutdownshutdownNow區別:

  • shutdownNow首先將執行緒池的狀態設定成STOP,然後嘗試停止所有的正在執行或暫停任務的執行緒,並返回等待執行任務的列表
  • shutdown只是將執行緒池的狀態設定成SHUTDOWN狀態,然後中斷所有沒有正在執行任務的執行緒。
  • 那麼怎麼合理配置執行緒池?
    • CPU密集型任務:配置儘可能小的執行緒,如配置Ncpu+1個執行緒的執行緒池。
    • IO密集型任務:配置儘可能多的執行緒,如2*Ncpu。
    • 混合型的任務:如果可以拆分,將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐量將高於序列執行的吞吐量。如果這兩個任務執行時間相差太大,則沒必要進行分解。
    • 依賴資料庫連線池的任務:因為執行緒提交SQL後需要等待資料庫返回結果,等待的時間越長,則CPU空閒時間就越長,那麼執行緒數應該設定得越大,這樣才能更好地利用CPU。

建議使用有界佇列:有界佇列能增加系統的穩定性和預警能力。


  • 監控

  • 假如大量使用執行緒池,則有必要對執行緒池進行監控,使用的屬性:
    • taskCount:執行緒池需要執行的任務數量。
    • completedTaskCount:執行緒池在執行過程中已完成的任務數量,小於或等於taskCount。
    • largestPoolSize:執行緒池裡曾經建立過的最大執行緒數量。通過這個資料可以知道執行緒池是否曾經滿過。如該數值等於執行緒池的最大大小,則表示執行緒池曾經滿過。
    • getPoolSize:執行緒池的執行緒數量。如果執行緒池不銷燬的話,執行緒池裡的執行緒不會自動銷燬,所以這個大小隻增不減。
    • getActiveCount:獲取活動的執行緒數。