1. 程式人生 > >執行緒池使用FutureTask時候需要注意的一點事, 獲取執行緒執行結果

執行緒池使用FutureTask時候需要注意的一點事, 獲取執行緒執行結果

8.4 執行緒池使用FutureTask時候需要注意的一點事

執行緒池使用FutureTask的時候如果拒絕策略設定為了 DiscardPolicyDiscardOldestPolicy並且在被拒絕的任務的Future物件上呼叫無參get方法那麼呼叫執行緒會一直被阻塞。

問題復現

下面就通過一個簡單的例子來複現問題:

public class FutureTest {

    //(1)執行緒池單個執行緒,執行緒池佇列元素個數為1
        private final static ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES,
            new ArrayBlockingQueue<Runnable>(1),new ThreadPoolExecutor.DiscardPolicy());

    public static void main(String[] args) throws Exception {

        //(2)新增任務one
        Future futureOne = executorService.submit(new Runnable() {
            @Override
            public void run() {

                System.out.println("start runable one");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        //(3)新增任務two
        Future futureTwo = executorService.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println("start runable two");
            }
        });

        //(4)新增任務three
        Future futureThree=null;
        try {
            futureThree = executorService.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("start runable three");
                }
            });
        } catch (Exception e) {
            System.out.println(e.getLocalizedMessage());
        }


        System.out.println("task one " + futureOne.get());//(5)等待任務one執行完畢
        System.out.println("task two " + futureTwo.get());//(6)等待任務two執行完畢
        System.out.println("task three " + (futureThree==null?null:futureThree.get()));// (7)等待任務three執行完畢


        executorService.shutdown();//(8)關閉執行緒池,阻塞直到所有任務執行完畢
    }

執行程式碼結果為:

image.png
  • 程式碼(1)建立了一個單執行緒並且佇列元素個數為1的執行緒池,並且拒絕策略設定為了DiscardPolicy
  • 程式碼(2)向執行緒池提交了一個任務one,那麼這個任務會使用唯一的一個執行緒進行執行,任務在列印 start runable one後會阻塞該執行緒5s.
  • 程式碼(3)向執行緒池提交了一個任務two,這時候會把任務two放入到阻塞佇列
  • 程式碼(4)向執行緒池提交任務three,由於佇列已經滿了則會觸發拒絕策略丟棄任務three,從執行結果看在任務one阻塞的5s內,主執行緒執行到了程式碼(5)等待任務one執行完畢,當任務one執行完畢後代碼(5)返回,主執行緒打印出task one null。任務one執行完成後執行緒池的唯一執行緒會去佇列裡面取出任務two並執行所以輸出start runable two然後程式碼(6)會返回,這時候主執行緒輸出task two null,然後執行程式碼(7)等待任務three執行完畢,從執行結果看程式碼(7)會一直阻塞不會返回,至此問題產生,如果把拒絕策略修改為DiscardOldestPolicy也會存在有一個任務的get方法一直阻塞只是現在是任務two被阻塞。但是如果拒絕策略設定為預設的AbortPolicy則會正常返回,並且會輸出如下結果:
start runable one
Task [email protected] rejected from [email protected][Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]
task one null
start runable two
task two null
task three null

問題分析

要分析這個問題需要看下執行緒池的submit方法裡面做了什麼,submit方法程式碼如下:

    public Future<?> submit(Runnable task) {
        ...
        //(1)裝飾Runnable為Future物件
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        //(6)返回future物件
        return ftask;
    }

        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

 public void execute(Runnable command) {
         ...
        //(2) 如果執行緒個數訊息核心執行緒數則新增處理執行緒處理
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //(3)如果當前執行緒個數已經達到核心執行緒數則任務放入佇列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //(4)嘗試新增處理執行緒進行處理
        else if (!addWorker(command, false))
            reject(command);//(5)新增失敗則呼叫拒絕策略
    }

根據程式碼可以總結如下:

  • 程式碼(1)裝飾Runnable為FutureTask物件,然後呼叫執行緒池的execute方法
  • 程式碼(2) 如果執行緒個數訊息核心執行緒數則新增處理執行緒處理
  • 程式碼(3)如果當前執行緒個數已經達到核心執行緒數則任務放入佇列
  • 程式碼(4)嘗試新增處理執行緒進行處理,失敗則進行程式碼(5),否者直接使用新執行緒處理
  • 程式碼(5)執行具體拒絕策略。

所以要分析上面例子中問題所在只需要看步驟(5)對被拒絕任務的影響,這裡先看下拒絕策略DiscardPolicy的程式碼:

    public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

可知拒絕策略rejectedExecution方法裡面什麼都沒做,所以程式碼(4)呼叫submit後會返回一個future物件,這裡有必要在重新說future是有狀態的,future的狀態列舉值如下:

    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

在步驟(1)的時候使用newTaskFor方法轉換Runnable任務為FutureTask,而FutureTask的建構函式裡面設定的狀態就是New。

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

所以使用DiscardPolicy策略提交後返回了一個狀態為NEW的future物件。
那麼我們下面就需要看下當呼叫future的無參get方法時候當future變為什麼狀態時候才會返回那,那就需看下FutureTask的get()方法程式碼:

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        //當狀態值<=COMPLETING時候需要等待,否者呼叫report返回
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

   private V report(int s) throws ExecutionException {
   Object x = outcome;
   //狀態值為NORMAL正常返回
   if (s == NORMAL)
       return (V)x;
   //狀態值大於等於CANCELLED則拋異常
   if (s >= CANCELLED)
       throw new CancellationException();
   throw new ExecutionException((Throwable)x);
}

也就是說當future的狀態>COMPLETING時候呼叫get方法才會返回,而明顯DiscardPolicy策略在拒絕元素的時候並沒有設定該future的狀態,後面也沒有其他機會可以設定該future的狀態,所以future的狀態一直是NEW,所以一直不會返回,同理DiscardOldestPolicy策略也是這樣的問題,最老的任務被淘汰時候沒有設定被淘汰任務對於future的狀態。

那麼預設的AbortPolicy策略為啥沒問題那?其實AbortPolicy策略時候步驟(5)直接會丟擲RejectedExecutionException異常,也就是submit方法並沒有返回future物件,這時候futureThree是null。

所以當使用Future的時候,儘量使用帶超時時間的get方法,這樣即使使用了DiscardPolicy拒絕策略也不至於一直等待,等待超時時間到了會自動返回的,如果非要使用不帶引數的get方法則可以重寫DiscardPolicy的拒絕策略在執行策略時候設定該Future的狀態大於COMPLETING即可,但是檢視FutureTask提供的方法發現只有cancel方法是public的並且可以設定FutureTask的狀態大於COMPLETING,重寫拒絕策略具體程式碼可以如下:

public class MyRejectedExecutionHandler implements RejectedExecutionHandler{

    @Override
    public void rejectedExecution(Runnable runable, ThreadPoolExecutor e) {

             if(null != runable && runable instanceof FutureTask){
                 ((FutureTask) runable).cancel(true);
             }

    }

}

使用這個策略時候由於從report方法知道在cancel的任務上呼叫get()方法會丟擲異常所以程式碼(7)需要使用try-catch捕獲異常程式碼(7)修改為如下:

        try{       
 System.out.println("task three " + (futureThree==null?null:futureThree.get()));// (7)等待任務three執行完畢
 }catch(Exception e){ System.out.println(e.getLocalizedMessage()); } 

執行結果為:

image.png

當然這相比正常情況下多了一個異常捕獲,其實最好的情況是重寫拒絕策略時候設定FutureTask的狀態為NORMAL,但是這需要重寫FutureTask方法了,因為FutureTask並沒有提供介面進行設定。

總結

本文通過案例介紹了執行緒池中使用FutureTask時候當拒絕策略為DiscardPolicyDiscardOldestPolicy的時候在被拒絕的任務的FutureTask物件上呼叫get()方法會導致呼叫執行緒一直阻塞,所以在日常開發中儘量使用帶超時引數的get方法以避免執行緒一直阻塞,另外通過重寫這些拒絕策略設定拒絕任務的狀態也可以達到想要的效果。

轉載地址 : http://ifeve.com/%e7%ba%bf%e7%a8%8b%e6%b1%a0%e4%bd%bf%e7%94%a8futuretask%e6%97%b6%e5%80%99%e9%9c%80%e8%a6%81%e6%b3%a8%e6%84%8f%e7%9a%84%e4%b8%80%e7%82%b9%e4%ba%8b/