1. 程式人生 > 其它 >Java多執行緒系列詳解_05_使用ThreadPoolExecutor自定義建立執行緒池

Java多執行緒系列詳解_05_使用ThreadPoolExecutor自定義建立執行緒池

技術標籤:Java多執行緒佇列java多執行緒

ThreadPoolExecutor詳解

1.構造詳解

ThreadPoolExecutor有四個構造
在這裡插入圖片描述
講解最詳細的如下:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize <
corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
序號名稱型別含義
1corePoolSizeint核心執行緒池大小
2maximumPoolSizeint最大執行緒池大小
3keepAliveTimelong執行緒最大空閒時間
4unitTimeUnit時間單位
5workQueueBlockingQueue執行緒等待佇列
6threadFactoryThreadFactory執行緒建立工廠
7handlerRejectedExecutionHandler拒絕策略

1. int corePoolSize :

  • 核心池的大小,建立了執行緒池後不會建立執行緒,直到有任務來才會執行建立執行緒;
  • prestartAllCoreThreads()或者prestartCoreThread()會初始化執行緒數;
  • 當執行緒池中的執行緒數目達到corePoolSize後,就會把到達的任務放到快取隊列當中;

2. int maximumPoolSize :

程池最大執行緒數, 它表示線上程池中最多能建立多少個執行緒

3. long keepAliveTime :

  • 空閒執行緒超時時長, 只有當執行緒數超過corePoolSize後的執行緒才會有效;
  • allowCoreThreadTimeOut(boolean)會導致執行緒數不超過corePoolSize也會生效

4. TimeUnit unit :

  • 引數keepAliveTime的時間單位
        //天
        TimeUnit days = TimeUnit.DAYS;
        //小時
        TimeUnit hours = TimeUnit.HOURS;
        //分鐘
        TimeUnit minutes = TimeUnit.MINUTES;
        //秒
        TimeUnit seconds = TimeUnit.SECONDS;
        //毫秒
        TimeUnit milliseconds = TimeUnit.MILLISECONDS;
        //微妙
        TimeUnit microseconds = TimeUnit.MICROSECONDS;
        //納秒
        TimeUnit nanoseconds = TimeUnit.NANOSECONDS;

5. BlockingQueue workQueue :

  • 一個阻塞佇列, 用來儲存等待執行的任務

6. ThreadFactory threadFactory :

  • 建立執行緒的工廠

7. RejectedExecutionHandler handler :

  • 拒絕處理任務時的策略

2.引數詳解

1. BlockingQueue workQueue介面常用實現類

BlockingQueue 介面的常用方法:

入隊:
offer(E e):如果佇列沒滿,立即返回true; 如果佇列滿了,立即返回false–>不阻塞
put(E e):如果佇列滿了,一直阻塞,直到佇列不滿了或者執行緒被中斷–>阻塞
offer(E e, long timeout, TimeUnit unit):在隊尾插入一個元素,,如果佇列已滿,則進入等待,直到出現以下三種情況:–>阻塞
被喚醒 / 等待時間超時 / 當前執行緒被中斷

出隊
poll():如果沒有元素,直接返回null;如果有元素,出隊
take():如果佇列空了,一直阻塞,直到佇列不為空或者執行緒被中斷–>阻塞
poll(long timeout, TimeUnit unit):如果佇列不空,出隊;如果佇列已空且已經超時,返回null;如果佇列已空且時間未超時,則進入等待,直到出現以下三種情況:
被喚醒 / 等待時間超時 / 當前執行緒被中斷

BlockingQueue 常見實現類 :

  • ArrayBlockingQueue(不常用)
  • LinkedBlockingQueue(常用)
  • SynchronousQueue(常用)
  • PriorityBlockingQueue(不常用)

1. ArrayBlockingQueue

  • 底層基於陣列實現

  • 初始化需要指定容量 , 不可擴容 , 屬於有界佇列

  • 在佇列全滿時執行入隊將會阻塞,在佇列為空時出隊同樣將會阻塞。

  • 併發阻塞是通過ReentrantLock和Condition來實現的

  • 內部只有一把鎖,意味著同一時刻只有一個執行緒能進行入隊或者出隊的操作。

  • 方法詳解:

    • final Object[] items : 核心陣列 , 儲存元素
    • final ReentrantLock lock : 可重入鎖 , 入隊和出隊使用一個鎖
    • private final Condition notEmpty : 不為空條件 , 出隊使用
    • private final Condition notFull : 不為滿條件 , 入隊使用
    • void put(E e) : 入隊方法, 將元素新增到尾部,如果佇列滿了,則一直阻塞等待有空位再入隊
    • boolean add(E e) : 入隊方法, 將元素新增到尾部,如果佇列滿了,則直接丟擲異常IllegalStateException
    • boolean offer(E e) : 入隊方法,將元素新增到尾部,如果佇列滿了,則直接返回false,而add()則會直接丟擲異常(此方法優先於add())
    • E poll() : 出隊方法,移除並將頂部元素返回,如果佇列為空,則返回null
    • E take() : 出隊方法,移除並將頂部元素返回,如果佇列空了,一直阻塞,直到佇列不為空或者執行緒被中斷
    • E peek() : 出隊方法,將頂部元素返回, 如果佇列為空,則返回null
    • E remove() : 出隊方法,刪除並當前物件,並返回佇列頭部的元素,如果佇列為空,則丟擲一個NoSuchElementException異常

2. LinkedBlockingQueue

LinkedBlockingQueue中維持兩把鎖,一把鎖用於入隊,一把鎖用於出隊,這也就意味著,同一時刻,只能有一個執行緒執行入隊,其餘執行入隊的執行緒將會被阻塞;同時,可以有另一個執行緒執行出隊,其餘執行出隊的執行緒將會被阻塞。換句話說,雖然入隊和出隊兩個操作同時均只能有一個執行緒操作,但是可以一個入隊執行緒和一個出隊執行緒共同執行,也就意味著可能同時有兩個執行緒在操作佇列,那麼為了維持執行緒安全,LinkedBlockingQueue使用一個AtomicInterger型別的變量表示當前佇列中含有的元素個數,所以可以確保兩個執行緒之間操作底層佇列是執行緒安全的。

  • LinkedBlockingQueue不允許元素為null。

  • 同一時刻,只能有一個執行緒執行入隊操作,因為putLock在將元素插入到佇列尾部時加鎖了

  • 如果佇列滿了,那麼將會呼叫notFull的await()方法將該執行緒加入到Condition等待佇列中。await()方法就會釋放執行緒佔有的鎖,這將導致之前由於被鎖阻塞的入隊執行緒將會獲取到鎖,執行到while迴圈處,不過可能因為由於佇列仍舊是滿的,也被加入到條件佇列中。

  • 一旦一個出隊執行緒取走了一個元素,並通知了入隊等待佇列中可以釋放執行緒了,那麼第一個加入到Condition佇列中的將會被釋放,那麼該執行緒將會重新獲得put鎖,繼而執行enqueue()方法,將節點插入到佇列的尾部

  • 然後得到插入一個節點之前的元素個數,如果佇列中還有空間可以插入,那麼就通知notFull條件的等待佇列中的執行緒。

  • 通知出隊執行緒佇列為空了,因為插入一個元素之前的個數為0,而插入一個之後佇列中的元素就從無變成了有,就可以通知因佇列為空而阻塞的出隊執行緒了。

  • 當佇列為空時,就加入到notEmpty(的條件等待佇列中,當佇列不為空時就取走一個元素,當取完發現還有元素可取時,再通知一下自己的夥伴(等待在條件佇列中的執行緒);最後,如果佇列從滿到非滿,通知一下put執行緒。

  • LinkedBlockingQueue是允許兩個執行緒同時在兩端進行入隊或出隊的操作的,但一端同時只能有一個執行緒進行操作,這是通過兩把鎖來區分的;

  • 為了維持底部資料的統一,引入了AtomicInteger的一個count變數,表示佇列中元素的個數。count只能在兩個地方變化,一個是入隊的方法(可以+1),另一個是出隊的方法(可以-1),而AtomicInteger是原子安全的,所以也就確保了底層佇列的資料同步。

3. SynchronousQueue

SynchronousQueue–基於CAS

總結:

ArrayBlockingQueue:

  • 一個物件陣列+一把鎖+兩個條件
  • 入隊與出隊都用同一把鎖
  • 在只有入隊高併發或出隊高併發的情況下,因為運算元組,且不需要擴容,效能很高
  • 採用了陣列,必須指定大小,即容量有限

LinkedBlockingQueue:

  • 一個單向連結串列+兩把鎖+兩個條件
  • 兩把鎖,一把用於入隊,一把用於出隊,有效的避免了入隊與出隊時使用一把鎖帶來的競爭。
  • 在入隊與出隊都高併發的情況下,效能比ArrayBlockingQueue高很多
  • 採用了連結串列,最大容量為整數最大值,可看做容量無限

2.RejectedExecutionHandler handler介面常用實現類

  • AbortPolicy:直接丟擲異常。
  • CallerRunsPolicy:只用呼叫者所線上程來執行任務。
  • DiscardOldestPolicy:丟棄佇列裡最近的一個任務,並執行當前任務。
  • DiscardPolicy:不處理,丟棄掉。
  • 也可以根據應用場景需要來實現RejectedExecutionHandler介面自定義策略。如記錄日誌或持久化不能處理的任務。(常用)

3.自定義執行緒池測試

    /**
     * 建立執行緒池測試方法
     */
    private static void createThread() {
        // 定義核心執行緒活躍數
        int corePoolSize = 2;
        // 定義最大執行緒活躍數
        int maximumPoolSize = 4;
        // 定義等待時間
        long keepAliveTime = 10;
        // 定義等待時間單位
        TimeUnit unit = TimeUnit.SECONDS;
        // 定義使用佇列
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
        // 定義執行緒工廠
        ThreadFactory threadFactory = (Runnable r) -> {
            Thread t = new Thread(r, "my-thread-" + CreateThreadByThreadPoolExecutor.mThreadNum.getAndIncrement());
            System.out.println(t.getName() + " has been created");
            return t;
        };
        // 定義拒絕處理任務時的策略
        RejectedExecutionHandler handler = (Runnable r, ThreadPoolExecutor e) -> {
            // 做日誌持久化記錄
            System.err.printf("%s is rejected \n", r.toString());
        };
        ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
                keepAliveTime, unit, workQueue, threadFactory, handler);
        executor.prestartAllCoreThreads(); // 預啟動所有核心執行緒

        for (int i = 1; i <= 10; i++) {
            executor.execute(() -> {
                System.out.printf("%s is running \n", Thread.currentThread().getName());
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    log.error(e.getMessage());
                }
            });
        }
    }

結果如下 :

  • 執行緒1-4先佔滿了核心執行緒和最大執行緒數量
  • 執行緒5-6進入等待佇列
  • 執行緒7-10被直接忽略拒絕執行
  • 等1-4執行緒中有執行緒執行完後通知4、5執行緒繼續執行