自定義FutureTask實現
阿新 • • 發佈:2019-08-12
FutureTask
FutureTask是Future的實現,用來非同步任務的獲取結果,可以啟動和取消非同步任務,查詢非同步任務是否計算結束以及獲取最終的非同步任務的結果。通過get()方法來獲取非同步任務的結果,但是會阻塞當前執行緒直至非同步任務執行結束。一旦任務執行結束,任務不能重新啟動或取消,除非呼叫runAndReset()方法。
程式碼示例:
public class ThreadTest { public static void main(String[] args) throws Exception { Callable<String> myCallable = new MyCallableThread(); FutureTask<String> futureTask = new FutureTask<>(myCallable); Thread myCallableThread = new Thread(futureTask); myCallableThread.setName("MyThread-implements-Callable-test"); myCallableThread.start(); System.out.println("Run by Thread:" + futureTask.get()); //通過執行緒池執行 ExecutorService executorService = Executors.newCachedThreadPool(); executorService.submit(futureTask); executorService.shutdown(); System.out.println("Run by ExecutorService:" + futureTask.get()); } } class MyCallableThread implements Callable<String> { @Override public String call() throws Exception { return Thread.currentThread().getName(); } }
實現一個自己的FutureTask
根據FutureTask核心原理,要實現一個FutureTask必須滿足以下方面:
- 需要泛型定義用以返回結果型別
- 需要一個callable物件,在構造方法中傳入
- 需要實現runnable介面,在run方法中實現具體結果計算
- 需要一個公開的get方法來獲取結果
- 如果執行緒沒有執行完,則呼叫get方法的執行緒需要進入等待佇列
- 需要一個欄位記錄執行緒執行的狀態
- 需要一個等待佇列儲存等待結果的執行緒
程式碼示例:
/** * 1. 泛型定義 * 2. 構造方法 callable * 3. 實現了runnable * 4. get方法返回callable執行結果 * 5. get方法有阻塞的效果(未執行結束的話) */ public class MyFutureTask<T> implements Runnable { // 程式執行的結果 private T result; // 要執行的任務 private Callable<T> callable; // 任務執行的狀態 private volatile int state = NEW; // 任務執行的狀態值 private static final int NEW = 0; private static final int RUNNING = 1; private static final int FINISHED = 2; // 獲取結果的執行緒等待佇列 LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>(100); // 執行當前FutureTask的執行緒,用CAS進行爭搶 AtomicReference<Thread> runner = new AtomicReference<>(); public MyFutureTask(Callable<T> task) { this.callable = task; } @Override public void run() { // 判斷當前物件的狀態,如果是New且搶鎖成功就執行 if (state != NEW || !runner.compareAndSet(null, Thread.currentThread())) return; state = RUNNING; try { result = callable.call(); } catch (Exception e) { e.printStackTrace(); } finally { state = FINISHED; } // 方法執行完,喚醒所有執行緒 while (true) { Thread waiterThread = waiters.poll(); if (waiterThread == null) break; LockSupport.unpark(waiterThread); } } public T get() { // 如果狀態不是FINISHED,則進入等待佇列 if (state != FINISHED) { waiters.offer(Thread.currentThread()); } while (state != FINISHED) { LockSupport.park(); } return result; } } // MyFutureTask 測試 public class FutureTaskTest { public static void main(String[] args) { Callable<String> myCallable = new MyCallableThread(); MyFutureTask<String> futureTask = new MyFutureTask<>(myCallable); Thread myCallableThread = new Thread(futureTask); myCallableThread.setName("MyFutureTask-test"); myCallableThread.start(); System.out.println("Run by Thread:" + futureTask.get()); } } class MyCallableThread implements Callable<String> { @Override public String call() throws Exception { return Thread.currentThread().getName(); } }