1. 程式人生 > 實用技巧 >JavaSE 執行緒池詳解

JavaSE 執行緒池詳解

1. 執行緒池的優勢

總體來說,執行緒池有如下優勢:

(1)降低資源的消耗。通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。

(2)提高響應速度。當任務到達時,任務可以不需要等到執行緒建立就能立即執行。

(3)提高現成的可管理性。執行緒是稀缺資源,如果無限制的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一的分配,調優和監控。

2. 執行緒池的使用

執行緒池真正的實現類是 ThreadPoolExecutor,其構造方法有如下 4 種:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
 
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}
 
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}
 
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

可以看到,其需要如下幾個引數:

  • corePoolSize(必需):核心執行緒數。預設情況下,核心執行緒會一直存活,但是當將 allowCoreThreadTimeout 設定為 true 時,核心執行緒也會超時回收。

  • maximumPoolSize(必需):執行緒池所能容納的最大執行緒數。當活躍執行緒數達到該數值後,後續的新任務將會阻塞。

  • keepAliveTime(必需):執行緒閒置超時時長。如果超過該時長,非核心執行緒就會被回收。如果將 allowCoreThreadTimeout 設定為true時,核心執行緒也會超時回收。

  • unit(必需):指定 keepAliveTime 引數的時間單位。常用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)。

  • workQueue(必需):任務佇列。通過執行緒池的 execute() 方法提交的 Runnable 物件將其儲存在該引數中。其採用阻塞佇列實現。

  • threadFactory(可選):執行緒工廠。用於指定為執行緒池建立新執行緒的方式。

  • handler(可選):拒絕策略。當達到最大執行緒數時需要執行的飽和策略。

執行緒池的使用流程如下:

// 建立執行緒池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,
                                             MAXIMUM_POOL_SIZE,
                                             KEEP_ALIVE,
                                             TimeUnit.SECONDS,
                                             sPoolWorkQueue,
                                             sThreadFactory);
// 向執行緒池提交任務
threadPool.execute(new Runnable() {
    @Override
    public void run() {
        ... // 執行緒執行的任務
    }
});
// 關閉執行緒池
threadPool.shutdown(); // 設定執行緒池的狀態為 SHUTDOWN,然後中斷所有沒有正在執行任務的執行緒
threadPool.shutdownNow(); // 設定執行緒池的狀態為 STOP,然後嘗試停止所有的正在執行或暫停任務的執行緒,並返回等待執行任務的列表

3. 執行緒池的工作原理

具體流程圖如下所示:

上圖具體介紹了,當一個執行緒池收到一個任務後的具體操作,接下來將會再對任務佇列、執行緒工廠和拒絕策略進行更多的說明。

4. 執行緒池的引數

4.1 任務佇列(workQueue)

任務佇列是基於阻塞佇列實現的,即採用生產者消費者模式,在 Java 中需要實現 BlockingQueue 介面。但 Java 已經為我們提供了 7 種阻塞佇列 的實現:

  1. ArrayBlockingQueue:一個由陣列結構組成的有界阻塞佇列(陣列結構可配合指標實現一個環形佇列)

  2. LinkedBlockingQueue:一個由連結串列結構組成的有界阻塞佇列,在未指明容量時,容量預設為 Integer.MAX_VALUE

  3. PriorityBlockingQueue:一個支援優先順序排序的無界阻塞佇列,對元素沒有要求,可以實現 Comparable 介面也可以提供 Comparator 來對佇列中的元素進行比較。跟時間沒有任何關係,僅僅是按照優先順序取任務。

  4. DelayQueue:類似於 PriorityBlockingQueue ,是二叉堆實現的無界優先順序阻塞佇列。要求元素都實現 Delayed 介面,通過執行時延從佇列中提取任務,時間沒到任務取不出來。

  5. SynchronousQueue:一個不儲存元素的阻塞佇列,消費者執行緒呼叫 take() 方法的時候就會發生阻塞,直到有一個生產者生產了一個元素,消費者執行緒就可以拿到這個元素並返回;生產者執行緒呼叫 put() 方法的時候也會發生阻塞,直到有一個消費者執行緒消費了一個元素,生產者才返回。

  6. LinkedBlockingDeque:使用雙向佇列實現的有界雙端阻塞佇列。雙端意味著可以像普通佇列一樣 FIFO(先進先出),也可以像棧一樣 FILO(先進後出)。

  7. LinkedTransferQueue:它是ConcurrentLinkedQueue、LinkedBlockingQueue和SynchronousQueue的結合體,但是把它用在 ThreadPoolExecutor 中,和 LinkedBlockingQueue 行為一致,但是是無界的阻塞佇列。

注意有界佇列和無界佇列的區別:

  • 如果使用有界佇列,當佇列飽和時並超過最大執行緒數時就會執行拒絕策略;

  • 而如果使用無界佇列,因為任務佇列永遠都可以新增任務,所以設定maximumPoolSize沒有任何意義。

4.2 執行緒工廠(threadFactory)

執行緒工廠指定建立執行緒的方式,需要實現 ThreadFactory 介面,並實現 newThread(Runnable r) 方法。該引數可以不用指定,Executors 框架已經為我們實現了一個預設的執行緒工廠:

/**
 * The default thread factory.
 */
private static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;
 
    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }
 
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

4.3 拒絕策略(handler)

當執行緒池的執行緒數達到最大執行緒數時,需要執行拒絕策略。拒絕策略需要實現 RejectedExecutionHandler 介面,並實現 rejectedExecution(Runnable r, ThreadPoolExecutor executor) 方法。不過 Executors 框架已經為我們實現了 4 種拒絕策略:

  1. AbortPolicy(預設):丟棄任務並丟擲 RejectedExecutionException 異常。

  2. CallerRunsPolicy:由呼叫執行緒處理該任務。

  3. DiscardPolicy:丟棄任務,但是不丟擲異常。可以配合這種模式進行自定義的處理方式。

  4. DiscardOldestPolicy:丟棄佇列最早的未處理任務,然後重新嘗試執行任務。

5. 功能執行緒池

上述的就是執行緒池的組成,其實 Executors 已經為我們封裝好了 4 種常見的功能執行緒池,如下:

  • 定長執行緒池(FixedThreadPool)

  • 定時執行緒池(ScheduledThreadPool)

  • 可快取執行緒池(CachedThreadPool)

  • 單執行緒化執行緒池(SingleThreadExecutor)

5.1 定長執行緒池(FixedThreadPool)

建立方法的原始碼:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}
  • 特點:只有核心執行緒,執行緒數量固定,執行完立即回收,任務佇列為連結串列結構的有界佇列。

  • 應用場景:控制執行緒最大併發數。

使用例項:

// 1. 建立定長執行緒池物件 & 設定執行緒池核心執行緒數量固定為3
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
// 2. 建立好Runnable類執行緒物件 & 需執行的任務
Runnable task =new Runnable(){
  public void run() {
     System.out.println("Running...");
  }
};
// 3. 向執行緒池提交任務
fixedThreadPool.execute(task);

5.2 定時執行緒池(ScheduledThreadPool)

建立方法的原始碼:

private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
 
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue());
}
 
public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue(), threadFactory);
}
  • 特點:核心執行緒數量固定,非核心執行緒數量無限,執行完閒置 10ms 後回收,任務佇列為延時阻塞佇列。

  • 應用場景:執行定時或週期性的任務。

使用例項:

// 1. 建立 定時執行緒池物件 & 設定執行緒池核心執行緒數量固定為5
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
// 2. 建立好Runnable類執行緒物件 & 需執行的任務
Runnable task =new Runnable(){
  public void run() {
     System.out.println("Running...");
  }
};
// 3. 向執行緒池提交任務
scheduledThreadPool.schedule(task, 1, TimeUnit.SECONDS); // 延遲1s後執行任務
scheduledThreadPool.scheduleAtFixedRate(task,10,1000,TimeUnit.MILLISECONDS);// 延遲10ms後、每隔1000ms執行任務

5.3 可快取執行緒池(CachedThreadPool)
建立方法的原始碼:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}
  • 特點:無核心執行緒,非核心執行緒數量無線,執行完閒置 60s 後回收,任務佇列為不儲存元素的阻塞佇列(SynchronousQueue)

  • 應用場景:執行大量、耗時少的任務。

使用例項:

// 1. 建立可快取執行緒池物件
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 2. 建立好Runnable類執行緒物件 & 需執行的任務
Runnable task =new Runnable(){
  public void run() {
     System.out.println("Running...");
  }
};
// 3. 向執行緒池提交任務
cachedThreadPool.execute(task);

5.4 單執行緒化執行緒池(SingleThreadExecutor)

建立方法的原始碼:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}
  • 特點:只有 1 個核心執行緒,無非核心執行緒,執行完立即回收,任務佇列為連結串列結構的有界佇列。

  • 應用場景:不適合併發但可能引起 IO阻塞性 及 影響 UI執行緒響應的操作,如資料庫操作、檔案操作等。

使用示例:

// 1. 建立單執行緒化執行緒池
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
// 2. 建立好Runnable類執行緒物件 & 需執行的任務
Runnable task =new Runnable(){
  public void run() {
     System.out.println("Running...");
  }
};
// 3. 向執行緒池提交任務
singleThreadExecutor.execute(task);

5.5 對比

6. 總結

Executors的4個功能執行緒池雖然方便,但現在已經不建議使用了,而是建議直接通過使用ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明確執行緒池的執行規則,規避資源耗盡的風險。

其實Executors的4個功能執行緒有如下弊端:

  • FixedThreadPool和SingleThreadExecutor:主要問題是堆積的請求處理佇列均採用LinkedBlockingQueue,可能會耗費非常大的記憶體,甚至OOM。

  • CachedThreadPool和ScheduledThreadPool:主要問題是執行緒數最大數是Integer.MAX_VALUE,可能會建立數量非常多的執行緒,甚至OOM。

本文轉載自:原文連結