CompletableFuture非同步程式設計
阿新 • • 發佈:2019-12-31
CompletableFuture 有什麼用
- CompletableFuture是用來描述多執行緒任務的時序關係的:序列關係,並行關係,聚合關係。
- CompletableFuture 是Java 8 新增加的Api,該類實現,Future和CompletionStage兩個介面,提供了非常強大的Future的擴充套件功能,可以幫助我們簡化非同步程式設計的複雜性,提供了函式語言程式設計的能力,可以通過回撥的方式處理計算結果,並且提供了轉換和組合CompletableFuture的方法。
建立CompletableFuture物件
方式一:使用預設執行緒池
/**
* 建立一個不帶返回值得任務。
*/
CompletableFuture<Void> f1 = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
//業務邏輯
}
});
/**
* 建立一個帶返回值的任務。
*/
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
//業務邏輯
return null;
}
});複製程式碼
方式二:使用自定義執行緒池(建議使用)
//建立執行緒池
ExecutorService executor = Executors.newFixedThreadPool(10);
//建立一個不帶返回值得任務。
CompletableFuture<Void> f1 = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
}
},executor);
//建立一個帶返回值的任務。
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
//業務邏輯
return null;
}
},executor); 複製程式碼
- 預設情況下CompletableFuture會使用公共的ForkJoinPool執行緒池,這個執行緒池預設建立的執行緒數是CPU的 核數(也可以通過JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism來設定ForkJoinPool 執行緒池的執行緒數)。如果所有CompletableFuture共享一個執行緒池,那麼一旦有任務執行一些很慢的I/O操 作,就會導致執行緒池中所有執行緒都阻塞在I/O操作上,從而造成執行緒飢餓,進而影響整個系統的效能。所以,強烈建議你要根據不同的業務型別建立不同的執行緒池,以避免互相干擾。
- 建立完CompletableFuture物件之後,會自動地非同步執行runnable.run()方法或者supplier.get()方法。因為CompletableFuture類實現了Future介面,所以這兩個問題你都可以通過Future介面來解決。另外,CompletableFuture類還實現了CompletionStage介面。
常用API
- public T get():獲取計算結果, 該方法為阻塞方法會等待計算結果完成。
- public T get(long timeout,TimeUnit unit):有時間限制的阻塞方法
- public T getNow(T valueIfAbsent)立即獲取方法結果,如果沒有計算結束則返回傳的值
- public T join()和 get() 方法類似也是主動阻塞執行緒,等待計算結果。和get() 方法有細微的差別。
- public boolean complete(T value):立即完成計算,並把結果設定為傳的值,返回是否設定成功
- public boolean completeExceptionally(Throwable ex):立即完成計算,並丟擲異常
理解CompletionStage介面
CompletionStage介面可以清晰地描述任務之間的這種時序關係,時序關係:序列,並行,匯聚。
序列
執行緒與執行緒之間的執行順序是序列的。
//Async代表的是非同步執行fn
CompletionStage<R> thenApply(fn);
CompletionStage<R> thenApplyAsync(fn);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);
CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);複製程式碼
- CompletionStage介面裡面描述序列關係,主要是thenApply、thenAccept、thenRun和thenCompose這四 個系列的介面。
- thenApply系列函式裡引數fn的型別是介面Function
,這個介面裡與CompletionStage相關的方法是 R apply(T t),這個方法既能接收引數也支援返回值,所以thenApply系列方法返回的 是CompletionStage ,>。
- thenApply系列函式裡引數fn的型別是介面Function
- 而thenAccept系列方法裡引數consumer的型別是介面Consumer
,這個介面裡與CompletionStage相關 的方法是 void accept(T t),這個方法雖然支援引數,但卻不支援回值,所以thenAccept系列方法返回 的是CompletionStage 。 - thenRun系列方法裡action的引數是Runnable,所以action既不能接收引數也不支援返回值,所以thenRun 系列方法返回的也是CompletionStage
。 - 這些方法裡面Async代表的是非同步執行fn、consumer或者action。其中,需要你注意的是thenCompose系列 方法,這個系列的方法會新創建出一個子流程,最終結果和thenApply系列是相同的。
- thenRun系列方法裡action的引數是Runnable,所以action既不能接收引數也不支援返回值,所以thenRun 系列方法返回的也是CompletionStage
演示序列
//supplyAsync()啟動一個非同步 流程
CompletableFuture<String> f0 = CompletableFuture.supplyAsync(
() -> "Hello World") //①
.thenApply(s -> s + "girl") //②
.thenApply(String::toUpperCase);//③
System.out.println(f0.join());
//輸出結果 HELLO WORLD girl複製程式碼
雖然這是一個非同步流程,但任務①②③卻是 序列執行的,②依賴①的執行結果,③依賴②的執行結果。
AND匯聚
CompletionStage介面裡面描述AND匯聚關係,主要是thenCombine、thenAcceptBoth和runAfterBoth系列的介面,這些介面的區別也是源自fn、consumer、action這三個核心引數不同。
CompletionStage<R> thenCombine(other,fn);
CompletionStage<R> thenCombineAsync(other,fn);
CompletionStage<Void> thenAcceptBoth(other,consumer); CompletionStage<Void> thenAcceptBothAsync(other,consumer); CompletionStage<Void> runAfterBoth(other,action);
CompletionStage<Void> runAfterBothAsync(other,action);
複製程式碼
演示:
// 啟動一個非同步流程f1
CompletableFuture<Void> f1 = CompletableFuture.runAsync(()->{
System.out.println("非同步任務-1");
});
// 啟動一個非同步流程f2
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{
String s = "非同步任務-2";
System.out.println(s);
return s;
});
//啟動一個匯聚流程f3
CompletableFuture<String> f3 = f1.thenCombine(f2,(a,tf)->{
return tf;//tf是f2的值
});
//等待任務3執行結果
System.out.println(f3.join());複製程式碼
OR匯聚
CompletionStage介面裡面描述OR匯聚關係,主要是applyToEither、acceptEither和runAfterEither系列的 介面,這些介面的區別也是源自fn、consumer、action這三個核心引數不同。
CompletionStage applyToEither(other,fn);
CompletionStage applyToEitherAsync(other,fn);
CompletionStage acceptEither(other,consumer);
CompletionStage acceptEitherAsync(other,consumer);
CompletionStage runAfterEither(other,action);
CompletionStage runAfterEitherAsync(other,action);
複製程式碼
功能演示:
// 啟動一個非同步流程f1
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(()->{
int t = new Random().nextInt(10);
System.out.println("f1-t = "+t);
sleep(t,TimeUnit.SECONDS);
return String.valueOf(t);
});
// 啟動一個非同步流程f2
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{
int t = new Random().nextInt(10);
System.out.println("f2-t = "+t);
sleep(t,TimeUnit.SECONDS);
return String.valueOf(t);
});
//將f1和f2的值彙總到f3
CompletableFuture<String> f3 = f1.applyToEither(f2,s ->
Integer.parseInt(f2.join())+Integer.parseInt(s)+""
);
System.out.println(f3.join());複製程式碼
碼字不易如果對你有幫助請給個關注
愛技術愛生活 QQ群: 894109590