JavaSE 執行緒池詳解
1. 執行緒池的優勢
總體來說,執行緒池有如下優勢:
(1)降低資源的消耗。通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。
(2)提高響應速度。當任務到達時,任務可以不需要等到執行緒建立就能立即執行。
(3)提高現成的可管理性。執行緒是稀缺資源,如果無限制的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一的分配,調優和監控。
2. 執行緒池的使用
執行緒池真正的實現類是 ThreadPoolExecutor
,其構造方法有如下 4 種:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
可以看到,其需要如下幾個引數:
-
corePoolSize(必需):核心執行緒數。預設情況下,核心執行緒會一直存活,但是當將 allowCoreThreadTimeout 設定為 true 時,核心執行緒也會超時回收。
-
maximumPoolSize(必需):執行緒池所能容納的最大執行緒數。當活躍執行緒數達到該數值後,後續的新任務將會阻塞。
-
keepAliveTime(必需):執行緒閒置超時時長。如果超過該時長,非核心執行緒就會被回收。如果將 allowCoreThreadTimeout 設定為true時,核心執行緒也會超時回收。
-
unit(必需):指定 keepAliveTime 引數的時間單位。常用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)。
-
workQueue(必需):任務佇列。通過執行緒池的 execute() 方法提交的 Runnable 物件將其儲存在該引數中。其採用阻塞佇列實現。
-
threadFactory(可選):執行緒工廠。用於指定為執行緒池建立新執行緒的方式。
-
handler(可選):拒絕策略。當達到最大執行緒數時需要執行的飽和策略。
執行緒池的使用流程如下:
// 建立執行緒池 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory); // 向執行緒池提交任務 threadPool.execute(new Runnable() { @Override public void run() { ... // 執行緒執行的任務 } }); // 關閉執行緒池 threadPool.shutdown(); // 設定執行緒池的狀態為 SHUTDOWN,然後中斷所有沒有正在執行任務的執行緒 threadPool.shutdownNow(); // 設定執行緒池的狀態為 STOP,然後嘗試停止所有的正在執行或暫停任務的執行緒,並返回等待執行任務的列表
3. 執行緒池的工作原理
具體流程圖如下所示:
上圖具體介紹了,當一個執行緒池收到一個任務後的具體操作,接下來將會再對任務佇列、執行緒工廠和拒絕策略進行更多的說明。
4. 執行緒池的引數
4.1 任務佇列(workQueue)
任務佇列是基於阻塞佇列實現的,即採用生產者消費者模式,在 Java 中需要實現 BlockingQueue 介面。但 Java 已經為我們提供了 7 種阻塞佇列 的實現:
-
ArrayBlockingQueue:一個由陣列結構組成的有界阻塞佇列(陣列結構可配合指標實現一個環形佇列)
-
LinkedBlockingQueue:一個由連結串列結構組成的有界阻塞佇列,在未指明容量時,容量預設為 Integer.MAX_VALUE
-
PriorityBlockingQueue:一個支援優先順序排序的無界阻塞佇列,對元素沒有要求,可以實現 Comparable 介面也可以提供 Comparator 來對佇列中的元素進行比較。跟時間沒有任何關係,僅僅是按照優先順序取任務。
-
DelayQueue:類似於 PriorityBlockingQueue ,是二叉堆實現的無界優先順序阻塞佇列。要求元素都實現 Delayed 介面,通過執行時延從佇列中提取任務,時間沒到任務取不出來。
-
SynchronousQueue:一個不儲存元素的阻塞佇列,消費者執行緒呼叫 take() 方法的時候就會發生阻塞,直到有一個生產者生產了一個元素,消費者執行緒就可以拿到這個元素並返回;生產者執行緒呼叫 put() 方法的時候也會發生阻塞,直到有一個消費者執行緒消費了一個元素,生產者才返回。
-
LinkedBlockingDeque:使用雙向佇列實現的有界雙端阻塞佇列。雙端意味著可以像普通佇列一樣 FIFO(先進先出),也可以像棧一樣 FILO(先進後出)。
-
LinkedTransferQueue:它是ConcurrentLinkedQueue、LinkedBlockingQueue和SynchronousQueue的結合體,但是把它用在 ThreadPoolExecutor 中,和 LinkedBlockingQueue 行為一致,但是是無界的阻塞佇列。
注意有界佇列和無界佇列的區別:
-
如果使用有界佇列,當佇列飽和時並超過最大執行緒數時就會執行拒絕策略;
-
而如果使用無界佇列,因為任務佇列永遠都可以新增任務,所以設定maximumPoolSize沒有任何意義。
4.2 執行緒工廠(threadFactory)
執行緒工廠指定建立執行緒的方式,需要實現 ThreadFactory 介面,並實現 newThread(Runnable r) 方法。該引數可以不用指定,Executors 框架已經為我們實現了一個預設的執行緒工廠:
/**
* The default thread factory.
*/
private static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
4.3 拒絕策略(handler)
當執行緒池的執行緒數達到最大執行緒數時,需要執行拒絕策略。拒絕策略需要實現 RejectedExecutionHandler 介面,並實現 rejectedExecution(Runnable r, ThreadPoolExecutor executor) 方法。不過 Executors 框架已經為我們實現了 4 種拒絕策略:
-
AbortPolicy(預設):丟棄任務並丟擲 RejectedExecutionException 異常。
-
CallerRunsPolicy:由呼叫執行緒處理該任務。
-
DiscardPolicy:丟棄任務,但是不丟擲異常。可以配合這種模式進行自定義的處理方式。
-
DiscardOldestPolicy:丟棄佇列最早的未處理任務,然後重新嘗試執行任務。
5. 功能執行緒池
上述的就是執行緒池的組成,其實 Executors 已經為我們封裝好了 4 種常見的功能執行緒池,如下:
-
定長執行緒池(FixedThreadPool)
-
定時執行緒池(ScheduledThreadPool)
-
可快取執行緒池(CachedThreadPool)
-
單執行緒化執行緒池(SingleThreadExecutor)
5.1 定長執行緒池(FixedThreadPool)
建立方法的原始碼:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
-
特點:只有核心執行緒,執行緒數量固定,執行完立即回收,任務佇列為連結串列結構的有界佇列。
-
應用場景:控制執行緒最大併發數。
使用例項:
// 1. 建立定長執行緒池物件 & 設定執行緒池核心執行緒數量固定為3
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
// 2. 建立好Runnable類執行緒物件 & 需執行的任務
Runnable task =new Runnable(){
public void run() {
System.out.println("Running...");
}
};
// 3. 向執行緒池提交任務
fixedThreadPool.execute(task);
5.2 定時執行緒池(ScheduledThreadPool)
建立方法的原始碼:
private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}
-
特點:核心執行緒數量固定,非核心執行緒數量無限,執行完閒置 10ms 後回收,任務佇列為延時阻塞佇列。
-
應用場景:執行定時或週期性的任務。
使用例項:
// 1. 建立 定時執行緒池物件 & 設定執行緒池核心執行緒數量固定為5
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
// 2. 建立好Runnable類執行緒物件 & 需執行的任務
Runnable task =new Runnable(){
public void run() {
System.out.println("Running...");
}
};
// 3. 向執行緒池提交任務
scheduledThreadPool.schedule(task, 1, TimeUnit.SECONDS); // 延遲1s後執行任務
scheduledThreadPool.scheduleAtFixedRate(task,10,1000,TimeUnit.MILLISECONDS);// 延遲10ms後、每隔1000ms執行任務
5.3 可快取執行緒池(CachedThreadPool)
建立方法的原始碼:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
-
特點:無核心執行緒,非核心執行緒數量無線,執行完閒置 60s 後回收,任務佇列為不儲存元素的阻塞佇列(SynchronousQueue)
-
應用場景:執行大量、耗時少的任務。
使用例項:
// 1. 建立可快取執行緒池物件
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 2. 建立好Runnable類執行緒物件 & 需執行的任務
Runnable task =new Runnable(){
public void run() {
System.out.println("Running...");
}
};
// 3. 向執行緒池提交任務
cachedThreadPool.execute(task);
5.4 單執行緒化執行緒池(SingleThreadExecutor)
建立方法的原始碼:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
-
特點:只有 1 個核心執行緒,無非核心執行緒,執行完立即回收,任務佇列為連結串列結構的有界佇列。
-
應用場景:不適合併發但可能引起 IO阻塞性 及 影響 UI執行緒響應的操作,如資料庫操作、檔案操作等。
使用示例:
// 1. 建立單執行緒化執行緒池
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
// 2. 建立好Runnable類執行緒物件 & 需執行的任務
Runnable task =new Runnable(){
public void run() {
System.out.println("Running...");
}
};
// 3. 向執行緒池提交任務
singleThreadExecutor.execute(task);
5.5 對比
6. 總結
Executors的4個功能執行緒池雖然方便,但現在已經不建議使用了,而是建議直接通過使用ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明確執行緒池的執行規則,規避資源耗盡的風險。
其實Executors的4個功能執行緒有如下弊端:
-
FixedThreadPool和SingleThreadExecutor:主要問題是堆積的請求處理佇列均採用LinkedBlockingQueue,可能會耗費非常大的記憶體,甚至OOM。
-
CachedThreadPool和ScheduledThreadPool:主要問題是執行緒數最大數是Integer.MAX_VALUE,可能會建立數量非常多的執行緒,甚至OOM。
本文轉載自:原文連結