Java多執行緒系列詳解_05_使用ThreadPoolExecutor自定義建立執行緒池
ThreadPoolExecutor詳解
1.構造詳解
ThreadPoolExecutor有四個構造
講解最詳細的如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
序號 | 名稱 | 型別 | 含義 |
---|---|---|---|
1 | corePoolSize | int | 核心執行緒池大小 |
2 | maximumPoolSize | int | 最大執行緒池大小 |
3 | keepAliveTime | long | 執行緒最大空閒時間 |
4 | unit | TimeUnit | 時間單位 |
5 | workQueue | BlockingQueue | 執行緒等待佇列 |
6 | threadFactory | ThreadFactory | 執行緒建立工廠 |
7 | handler | RejectedExecutionHandler | 拒絕策略 |
1. int corePoolSize :
- 核心池的大小,建立了執行緒池後不會建立執行緒,直到有任務來才會執行建立執行緒;
- prestartAllCoreThreads()或者prestartCoreThread()會初始化執行緒數;
- 當執行緒池中的執行緒數目達到corePoolSize後,就會把到達的任務放到快取隊列當中;
2. int maximumPoolSize :
程池最大執行緒數, 它表示線上程池中最多能建立多少個執行緒
3. long keepAliveTime :
- 空閒執行緒超時時長, 只有當執行緒數超過corePoolSize後的執行緒才會有效;
- allowCoreThreadTimeOut(boolean)會導致執行緒數不超過corePoolSize也會生效
4. TimeUnit unit :
- 引數keepAliveTime的時間單位
//天
TimeUnit days = TimeUnit.DAYS;
//小時
TimeUnit hours = TimeUnit.HOURS;
//分鐘
TimeUnit minutes = TimeUnit.MINUTES;
//秒
TimeUnit seconds = TimeUnit.SECONDS;
//毫秒
TimeUnit milliseconds = TimeUnit.MILLISECONDS;
//微妙
TimeUnit microseconds = TimeUnit.MICROSECONDS;
//納秒
TimeUnit nanoseconds = TimeUnit.NANOSECONDS;
5. BlockingQueue workQueue :
- 一個阻塞佇列, 用來儲存等待執行的任務
6. ThreadFactory threadFactory :
- 建立執行緒的工廠
7. RejectedExecutionHandler handler :
- 拒絕處理任務時的策略
2.引數詳解
1. BlockingQueue workQueue介面常用實現類
BlockingQueue 介面的常用方法:
入隊:
offer(E e):如果佇列沒滿,立即返回true; 如果佇列滿了,立即返回false–>不阻塞
put(E e):如果佇列滿了,一直阻塞,直到佇列不滿了或者執行緒被中斷–>阻塞
offer(E e, long timeout, TimeUnit unit):在隊尾插入一個元素,,如果佇列已滿,則進入等待,直到出現以下三種情況:–>阻塞
被喚醒 / 等待時間超時 / 當前執行緒被中斷
出隊
poll():如果沒有元素,直接返回null;如果有元素,出隊
take():如果佇列空了,一直阻塞,直到佇列不為空或者執行緒被中斷–>阻塞
poll(long timeout, TimeUnit unit):如果佇列不空,出隊;如果佇列已空且已經超時,返回null;如果佇列已空且時間未超時,則進入等待,直到出現以下三種情況:
被喚醒 / 等待時間超時 / 當前執行緒被中斷
BlockingQueue 常見實現類 :
- ArrayBlockingQueue(不常用)
- LinkedBlockingQueue(常用)
- SynchronousQueue(常用)
- PriorityBlockingQueue(不常用)
1. ArrayBlockingQueue
-
底層基於陣列實現
-
初始化需要指定容量 , 不可擴容 , 屬於有界佇列
-
在佇列全滿時執行入隊將會阻塞,在佇列為空時出隊同樣將會阻塞。
-
併發阻塞是通過ReentrantLock和Condition來實現的
-
內部只有一把鎖,意味著同一時刻只有一個執行緒能進行入隊或者出隊的操作。
-
方法詳解:
final Object[] items
: 核心陣列 , 儲存元素final ReentrantLock lock
: 可重入鎖 , 入隊和出隊使用一個鎖private final Condition notEmpty
: 不為空條件 , 出隊使用private final Condition notFull
: 不為滿條件 , 入隊使用void put(E e)
: 入隊方法, 將元素新增到尾部,如果佇列滿了,則一直阻塞等待有空位再入隊boolean add(E e)
: 入隊方法, 將元素新增到尾部,如果佇列滿了,則直接丟擲異常IllegalStateException
boolean offer(E e)
: 入隊方法,將元素新增到尾部,如果佇列滿了,則直接返回false,而add()則會直接丟擲異常(此方法優先於add())E poll()
: 出隊方法,移除並將頂部元素返回,如果佇列為空,則返回nullE take()
: 出隊方法,移除並將頂部元素返回,如果佇列空了,一直阻塞,直到佇列不為空或者執行緒被中斷E peek()
: 出隊方法,將頂部元素返回, 如果佇列為空,則返回nullE remove()
: 出隊方法,刪除並當前物件,並返回佇列頭部的元素,如果佇列為空,則丟擲一個NoSuchElementException
異常
2. LinkedBlockingQueue
LinkedBlockingQueue中維持兩把鎖,一把鎖用於入隊,一把鎖用於出隊,這也就意味著,同一時刻,只能有一個執行緒執行入隊,其餘執行入隊的執行緒將會被阻塞;同時,可以有另一個執行緒執行出隊,其餘執行出隊的執行緒將會被阻塞。換句話說,雖然入隊和出隊兩個操作同時均只能有一個執行緒操作,但是可以一個入隊執行緒和一個出隊執行緒共同執行,也就意味著可能同時有兩個執行緒在操作佇列,那麼為了維持執行緒安全,LinkedBlockingQueue使用一個AtomicInterger型別的變量表示當前佇列中含有的元素個數,所以可以確保兩個執行緒之間操作底層佇列是執行緒安全的。
-
LinkedBlockingQueue不允許元素為null。
-
同一時刻,只能有一個執行緒執行入隊操作,因為putLock在將元素插入到佇列尾部時加鎖了
-
如果佇列滿了,那麼將會呼叫notFull的await()方法將該執行緒加入到Condition等待佇列中。await()方法就會釋放執行緒佔有的鎖,這將導致之前由於被鎖阻塞的入隊執行緒將會獲取到鎖,執行到while迴圈處,不過可能因為由於佇列仍舊是滿的,也被加入到條件佇列中。
-
一旦一個出隊執行緒取走了一個元素,並通知了入隊等待佇列中可以釋放執行緒了,那麼第一個加入到Condition佇列中的將會被釋放,那麼該執行緒將會重新獲得put鎖,繼而執行enqueue()方法,將節點插入到佇列的尾部
-
然後得到插入一個節點之前的元素個數,如果佇列中還有空間可以插入,那麼就通知notFull條件的等待佇列中的執行緒。
-
通知出隊執行緒佇列為空了,因為插入一個元素之前的個數為0,而插入一個之後佇列中的元素就從無變成了有,就可以通知因佇列為空而阻塞的出隊執行緒了。
-
當佇列為空時,就加入到notEmpty(的條件等待佇列中,當佇列不為空時就取走一個元素,當取完發現還有元素可取時,再通知一下自己的夥伴(等待在條件佇列中的執行緒);最後,如果佇列從滿到非滿,通知一下put執行緒。
-
LinkedBlockingQueue是允許兩個執行緒同時在兩端進行入隊或出隊的操作的,但一端同時只能有一個執行緒進行操作,這是通過兩把鎖來區分的;
-
為了維持底部資料的統一,引入了AtomicInteger的一個count變數,表示佇列中元素的個數。count只能在兩個地方變化,一個是入隊的方法(可以+1),另一個是出隊的方法(可以-1),而AtomicInteger是原子安全的,所以也就確保了底層佇列的資料同步。
3. SynchronousQueue
SynchronousQueue–基於CAS
總結:
ArrayBlockingQueue:
- 一個物件陣列+一把鎖+兩個條件
- 入隊與出隊都用同一把鎖
- 在只有入隊高併發或出隊高併發的情況下,因為運算元組,且不需要擴容,效能很高
- 採用了陣列,必須指定大小,即容量有限
LinkedBlockingQueue:
- 一個單向連結串列+兩把鎖+兩個條件
- 兩把鎖,一把用於入隊,一把用於出隊,有效的避免了入隊與出隊時使用一把鎖帶來的競爭。
- 在入隊與出隊都高併發的情況下,效能比ArrayBlockingQueue高很多
- 採用了連結串列,最大容量為整數最大值,可看做容量無限
2.RejectedExecutionHandler handler介面常用實現類
- AbortPolicy:直接丟擲異常。
- CallerRunsPolicy:只用呼叫者所線上程來執行任務。
- DiscardOldestPolicy:丟棄佇列裡最近的一個任務,並執行當前任務。
- DiscardPolicy:不處理,丟棄掉。
- 也可以根據應用場景需要來實現RejectedExecutionHandler介面自定義策略。如記錄日誌或持久化不能處理的任務。(常用)
3.自定義執行緒池測試
/**
* 建立執行緒池測試方法
*/
private static void createThread() {
// 定義核心執行緒活躍數
int corePoolSize = 2;
// 定義最大執行緒活躍數
int maximumPoolSize = 4;
// 定義等待時間
long keepAliveTime = 10;
// 定義等待時間單位
TimeUnit unit = TimeUnit.SECONDS;
// 定義使用佇列
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
// 定義執行緒工廠
ThreadFactory threadFactory = (Runnable r) -> {
Thread t = new Thread(r, "my-thread-" + CreateThreadByThreadPoolExecutor.mThreadNum.getAndIncrement());
System.out.println(t.getName() + " has been created");
return t;
};
// 定義拒絕處理任務時的策略
RejectedExecutionHandler handler = (Runnable r, ThreadPoolExecutor e) -> {
// 做日誌持久化記錄
System.err.printf("%s is rejected \n", r.toString());
};
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
keepAliveTime, unit, workQueue, threadFactory, handler);
executor.prestartAllCoreThreads(); // 預啟動所有核心執行緒
for (int i = 1; i <= 10; i++) {
executor.execute(() -> {
System.out.printf("%s is running \n", Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
log.error(e.getMessage());
}
});
}
}
結果如下 :
- 執行緒1-4先佔滿了核心執行緒和最大執行緒數量
- 執行緒5-6進入等待佇列
- 執行緒7-10被直接忽略拒絕執行
- 等1-4執行緒中有執行緒執行完後通知4、5執行緒繼續執行