【小家java】Java8新特性之---CompletableFuture的系統講解和例項演示(使用CompletableFuture構建非同步應用)
相關閱讀
【小家java】java5新特性(簡述十大新特性) 重要一躍
【小家java】java6新特性(簡述十大新特性) 雞肋升級
【小家java】java7新特性(簡述八大新特性) 不溫不火
【小家java】java8新特性(簡述十大新特性) 飽受讚譽
【小家java】java9新特性(簡述十大新特性) 褒貶不一
【小家java】java10新特性(簡述十大新特性) 小步迭代
【小家java】java11新特性(簡述八大新特性) 首個重磅LTS版本
非同步
傳統單執行緒環境下,呼叫函式是同步的,必須等待程式返回結果後,才可進行其他處理。因此為了提高系統整體的併發效能,引入了非同步執行~
jdk中已經內建future模式的實現。Future是Java5新增的類,用來描述一個非同步計算的結果。可以用isDone方法來檢查計算是否完成,或者使用get阻塞住呼叫執行緒,直至計算完成返回結果,也可以用cancel方法來停止任務的執行。
Futrue非同步模式存在的問題
Future以及相關使用方法提供了非同步執行任務的能力,但對於結果的獲取卻是不方便,只能通過阻塞或輪詢的方式得到任務結果。
阻塞的方式與我們理解的非同步程式設計其實是相違背的,而輪詢又會耗無謂的CPU資源。而且還不能及時得到計算結果,為什麼不能用觀察者設計模式當計算結果完成及時通知監聽者呢?
很多語言像Node.js,採用回撥的方式實現非同步程式設計。Java的一些框架像Netty,自己擴充套件Java的Future介面,提供了addListener等多個擴充套件方法。
guava裡面也提供了通用的擴充套件Future: ListenableFuture\SettableFuture以及輔助類Futures等,方便非同步程式設計
作為正統Java類庫,是不是應該加點什麼特性,可以加強一下自身庫的功能?
JDK8引入中重磅類庫:CompletableFuture
Java8裡面新增加了一個包含50個方法左右的類:CompletableFuture. 提供了非常強大的Future的擴充套件功能,可以幫助簡化非同步程式設計的複雜性,提供了函數語言程式設計能力,可以通過回撥的方式計算處理結果,並且提供了轉換和組織CompletableFuture的方法。
JDK1.8才新加入的一個實現類CompletableFuture,實現了Future, CompletionStage兩個介面。
CompletableFuture實現了CompletionStage介面的如下策略:
- 為了完成當前的CompletableFuture介面或者其他完成方法的回撥函式的執行緒,提供了非非同步的完成操作。
- 沒有顯式入參Executor的所有async方法都使用ForkJoinPool.commonPool()為了簡化監視、除錯和跟蹤,所有生成的非同步任務都是標記介面AsynchronousCompletionTask的例項。
- 所有的CompletionStage方法都是獨立於其他共有方法實現的,因此一個方法的行為不會受到子類中其他方法的覆蓋
CompletableFuture實現了Futurre介面的如下策略:
- CompletableFuture無法直接控制完成,所以cancel操作被視為是另一種異常完成形式。方法isCompletedExceptionally可以用來確定一個CompletableFuture是否以任何異常的方式完成。
- 以一個CompletionException為例,方法get()和get(long,TimeUnit)丟擲一個ExecutionException,對應CompletionException。為了在大多數上下文中簡化用法,這個類還定義了方法join()和getNow,而不是直接在這些情況中直接丟擲CompletionException。
CompletableFuture中4個非同步執行任務靜態方法:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
其中supplyAsync用於有返回值的任務,runAsync則用於沒有返回值的任務。Executor引數可以手動指定執行緒池,否則預設ForkJoinPool.commonPool()系統級公共執行緒池,
注意:這些執行緒都是Daemon執行緒,主執行緒結束Daemon執行緒不結束,只有JVM關閉時,生命週期終止
主動完成計算
CompletableFuture 類實現了CompletionStage和Future介面,所以還是可以像以前一樣通過阻塞或輪詢的方式獲得結果。儘管這種方式不推薦使用。
如下四個方法都可以獲取結果:
public T get() //Futrue的方法 阻塞
public T get(long timeout, TimeUnit unit) //Futrue的方法 阻塞
// 新提供的方法
public T getNow(T valueIfAbsent) //getNow有點特殊,如果結果已經計算完則返回結果或拋異常,否則返回給定的valueIfAbsent的值(此方法有點反人類有木有)
public T join() // 返回計算的結果或丟擲一個uncheckd異常。 推薦使用
上面4個方法,推薦使用join,還有帶超時時間的get方法
CompletableFuture並非一定要交給執行緒池執行才能實現非同步,你可以像下面這樣實現非同步執行:
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture();
//自己開個執行緒去執行 執行完把結果告訴completableFuture即可
new Thread(() -> {
// 模擬執行耗時任務
System.out.println("task doing...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 告訴completableFuture任務已經完成 並且把結果告訴completableFuture
completableFuture.complete("ok"); //這裡把你信任的結果set進去後,所有阻塞的get()方法都能立馬蘇醒,獲得到結果
}).start();
// 獲取任務結果,如果沒有完成會一直阻塞等待
System.out.println("準備列印結果...");
String result = completableFuture.get();
System.out.println("計算結果:" + result);
}
輸出:
準備列印結果...
task doing...
計算結果:ok
如果沒有意外,上面發的程式碼工作得很正常。但是,如果任務執行過程中產生了異常會怎樣呢?如下:只加一句1/0的程式碼
//自己開個執行緒去執行 執行完把結果告訴completableFuture即可
new Thread(() -> {
// 模擬執行耗時任務
System.out.println("task doing...");
try {
Thread.sleep(3000);
System.out.println(1 / 0);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 告訴completableFuture任務已經完成 並且把結果告訴completableFuture
completableFuture.complete("ok");
}).start();
這種情況下會得到一個相當糟糕的結果:異常會被限制在執行任務的執行緒的範圍內,最終會殺死該守護執行緒,而主執行緒,將永遠永遠阻塞了。
怎麼解決呢?
- 使用get(long timeout, TimeUnit unit)代替get()方法,它使用一個超時引數來避免發生這樣的情況。這是一種值得推薦的做法,我們應該儘量在你的程式碼中新增超時判斷的邏輯,避免發生類似的問題。
使用這種方法至少能防止程式永久地等待下去,超時發生時,程式會得到通知發生了TimeoutException 。不過,也因為如此,你不能確定執行任務的執行緒內到底發生了什麼問題(因此自己要做好權衡)。
- 更好的解決方案是:為了能獲取任務執行緒內發生的異常,你需要使用
CompletableFuture的completeExceptionally方法將導致CompletableFuture內發生問題的異常丟擲。這樣,當執行任務發生異常時,呼叫get()方法的執行緒將會收到一個 ExecutionException異常,該異常接收了一個包含失敗原因的Exception 引數。
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture();
//自己開個執行緒去執行 執行完把結果告訴completableFuture即可
new Thread(() -> {
// 模擬執行耗時任務
System.out.println("task doing...");
try {
Thread.sleep(3000);
System.out.println(1 / 0);
//} catch (InterruptedException e) {
} catch (Exception e) {
// 告訴completableFuture任務發生異常了
completableFuture.completeExceptionally(e);
e.printStackTrace();
}
// 告訴completableFuture任務已經完成 並且把結果告訴completableFuture
completableFuture.complete("ok");
}).start();
// 獲取任務結果,如果沒有完成會一直阻塞等待
System.out.println("準備列印結果...");
String result = completableFuture.get();
System.out.println("計算結果:" + result);
}
這樣子,如果內部發生了異常,呼叫get方法的時候就能得到這個Exception,進而能拿到拋異常的原因了。
使用案例
在Java8中,CompletableFuture提供了非常強大的Future的擴充套件功能,可以幫助我們簡化非同步程式設計的複雜性,並且提供了函數語言程式設計的能力,可以通過回撥的方式處理計算結果,也提供了轉換和組合 CompletableFuture 的方法。
它可能代表一個明確完成的Future,也有可能代表一個完成階段( CompletionStage ),它支援在計算完成以後觸發一些函式或執行某些動作。
建立CompletableFuture
四個靜態方法(如上),一個空建構函式
whenComplete計算結果完成時的處理
當CompletableFuture的計算結果完成,或者丟擲異常的時候,我們可以執行特定的Action。主要是下面的方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
可以看到Action的型別是BiConsumer<? super T,? super Throwable>,它可以處理正常的計算結果,或者異常情況。
方法不以Async結尾,意味著Action使用相同的執行緒執行,而Async可能會使用其它的執行緒去執行(如果使用相同的執行緒池,也可能會被同一個執行緒選中執行)。
注意這幾個方法都會返回CompletableFuture。
CompletableFuture.supplyAsync(() -> 100)
.thenApplyAsync(i -> i * 10)
.thenApply(i -> i.toString())
.whenComplete((r, e) -> System.out.println(r + "_____" + e)); //1000_____null
//若有異常
CompletableFuture.supplyAsync(() -> 1 / 0)
.thenApplyAsync(i -> i * 10)
.thenApply(i -> i.toString())
.whenComplete((r, e) -> System.out.println(r + "_____" + e)); //null_____java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
handle、 thenApply相當於回撥函式(callback) 當然也有轉換的作用
public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(asyncPool, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}
使用方式如下:
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> 100)
.thenApplyAsync(i -> i * 10)
.thenApply(i -> i.toString())
.whenComplete((r, e) -> System.out.println(r + "_____" + e)); //1000_____null
//若有異常
CompletableFuture.supplyAsync(() -> 1 / 0)
.thenApplyAsync(i -> i * 10)
.thenApply(i -> i.toString())
.whenComplete((r, e) -> System.out.println(r + "_____" + e)); //null_____java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
//上面效果 或者下面這麼寫也行(但上面那麼寫 連同異常都可以處理) 全部匿名方式 效率高 程式碼也優雅
//CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> 100)
// .thenApplyAsync(i -> i * 10)
// .thenApply(i -> i.toString());
//System.out.println(f.get()); //"1000"
}
我們會發現,結合Java8的流式處理,簡直絕配。程式碼看起來特別的優雅,關鍵還效率高,連異常都一下子給我們抓住了,簡直完美。
thenApply與handle方法的區別在於handle方法會處理正常計算值和異常,因此它可以遮蔽異常,避免異常繼續丟擲。而thenApply方法只是用來處理正常值,因此一旦有異常就會丟擲。
thenAccept與thenRun(純消費(執行Action))
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return uniAcceptStage(asyncPool, action);
}
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor) {
return uniAcceptStage(screenExecutor(executor), action);
}
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor) {
return uniRunStage(screenExecutor(executor), action);
}
- 可以看到,thenAccept和thenRun都是無返回值的。如果說thenApply是不停的輸入輸出的進行生產,那麼thenAccept和thenRun就是在進行消耗。它們是整個計算的最後兩個階段。
- 同樣是執行指定的動作,同樣是消耗,二者也有區別:
thenAccept接收上一階段的輸出作為本階段的輸入
thenRun根本不關心前一階段的輸出,根本不不關心前一階段的計算結果,因為它不需要輸入引數(thenRun使用的是Runnable,若你只是單純的消費,不需要啟用執行緒時,就用thenAccept更合適)
上面的方法是當計算完成的時候,會生成新的計算結果(thenApply
, handle
),或者返回同樣的計算結果whenComplete
。CompletableFuture還提供了一種處理結果的方法,只對結果執行Action,而不返回新的計算值,因此計算值為Void:
public static void main(String[] args) {
CompletableFuture<Void> f = CompletableFuture.supplyAsync(() -> 100)
.thenAccept(x -> System.out.println(x)); //100
//如果此句話get不呼叫 也是能夠輸出100的 上面也會有輸出的
System.out.println(f.join()); //null 返回null,所以thenAccept是木有返回值的
//thenRun的案例演示
CompletableFuture<Void> f2 = CompletableFuture.supplyAsync(() -> 100)
.thenRun(() -> System.out.println("不需要入參")); //不需要入參
System.out.println(f2.join()); //null 返回null,所以thenAccept是木有返回值的
}
thenAcceptBoth以及相關方法提供了類似的功能,當兩個CompletionStage都正常完成計算的時候,就會執行提供的action,它用來組合另外一個非同步的結果。
runAfterBoth是當兩個CompletionStage都正常完成計算的時候,執行一個Runnable,這個Runnable並不使用計算的結果。
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
例子:
public static void main(String[] args) {
CompletableFuture<Void> f = CompletableFuture.supplyAsync(() -> 100)
// 第二個消費者:x,y顯然是可以把前面幾個的結果都拿到,然後再做處理
.thenAcceptBoth(CompletableFuture.completedFuture(10), (x, y) -> System.out.println(x * y)); //1000
System.out.println(f.join()); //null
}
thenCombine、thenCompose整合兩個計算結果
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V