1. 程式人生 > 程式設計 >Java併發 CompletableFuture非同步程式設計的實現

Java併發 CompletableFuture非同步程式設計的實現

前面我們不止一次提到,用多執行緒優化效能,其實不過就是將序列操作變成並行操作。如果仔細觀察,你還會發現在序列轉換成並行的過程中,一定會涉及到非同步化,例如下面的示例程式碼,現在是序列的,為了提升效能,我們得把它們並行化。

// 以下兩個方法都是耗時操作
doBizA();
doBizB();


//建立兩個子執行緒去執行就可以了,兩個操作已經被非同步化了。
new Thread(()->doBizA())
 .start();
new Thread(()->doBizB())
 .start(); 

非同步化,是並行方案得以實施的基礎,更深入地講其實就是:利用多執行緒優化效能這個核心方案得以實施的基礎

。Java 在 1.8 版本提供了 CompletableFuture 來支援非同步程式設計。

CompletableFuture 的核心優勢

為了領略 CompletableFuture 非同步程式設計的優勢,這裡我們用 CompletableFuture 重新實現前面曾提及的燒水泡茶程式。首先還是需要先完成分工方案,在下面的程式中,我們分了 3 個任務:任務 1 負責洗水壺、燒開水,任務 2 負責洗茶壺、洗茶杯和拿茶葉,任務 3 負責泡茶。其中任務 3 要等待任務 1 和任務 2 都完成後才能開始。這個分工如下圖所示。


燒水泡茶分工方案

// 任務 1:洗水壺 -> 燒開水
CompletableFuture<Void> f1 = 
 CompletableFuture.runAsync(()->{
 System.out.println("T1: 洗水壺...");
 sleep(1,TimeUnit.SECONDS);

 System.out.println("T1: 燒開水...");
 sleep(15,TimeUnit.SECONDS);
});
// 任務 2:洗茶壺 -> 洗茶杯 -> 拿茶葉
CompletableFuture<String> f2 = 
 CompletableFuture.supplyAsync(()->{
 System.out.println("T2: 洗茶壺...");
 sleep(1,TimeUnit.SECONDS);

 System.out.println("T2: 洗茶杯...");
 sleep(2,TimeUnit.SECONDS);

 System.out.println("T2: 拿茶葉...");
 sleep(1,TimeUnit.SECONDS);
 return " 龍井 ";
});
// 任務 3:任務 1 和任務 2 完成後執行:泡茶
CompletableFuture<String> f3 = 
 f1.thenCombine(f2,(__,tf)->{
  System.out.println("T1: 拿到茶葉:" + tf);
  System.out.println("T1: 泡茶...");
  return " 上茶:" + tf;
 });
// 等待任務 3 執行結果
System.out.println(f3.join());

void sleep(int t,TimeUnit u) {
 try {
  u.sleep(t);
 }catch(InterruptedException e){}
}
// 一次執行結果:
T1: 洗水壺...
T2: 洗茶壺...
T1: 燒開水...
T2: 洗茶杯...
T2: 拿茶葉...
T1: 拿到茶葉: 龍井
T1: 泡茶...
上茶: 龍井

從整體上來看,我們會發現

  • 無需手工維護執行緒,沒有繁瑣的手工維護執行緒的工作,給任務分配執行緒的工作也不需要我們關注;
  • 語義更清晰,例如f3 = f1.thenCombine(f2,()->{}) 能夠清晰地表述“任務 3 要等待任務 1 和任務 2 都完成後才能開始”;
  • 程式碼更簡練並且專注於業務邏輯,幾乎所有程式碼都是業務邏輯相關的。

領略 CompletableFuture 非同步程式設計的優勢之後,下面我們詳細介紹 CompletableFuture 的使用。

建立 CompletableFuture 物件

建立 CompletableFuture 物件主要靠下面程式碼中展示的這 4 個靜態方法,我們先看前兩個。在燒水泡茶的例子中,我們已經使用了runAsync(Runnable runnable)

supplyAsync(Supplier<U> supplier),它們之間的區別是:Runnable 介面的 run() 方法沒有返回值,而 Supplier 介面的 get() 方法是有返回值的。

前兩個方法和後兩個方法的區別在於:後兩個方法可以指定執行緒池引數。

預設情況下 CompletableFuture 會使用公共的 ForkJoinPool 執行緒池,這個執行緒池預設建立的執行緒數是 CPU 的核數(也可以通過 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 來設定 ForkJoinPool 執行緒池的執行緒數)。如果所有 CompletableFuture 共享一個執行緒池,那麼一旦有任務執行一些很慢的 I/O 操作,就會導致執行緒池中所有執行緒都阻塞在 I/O 操作上,從而造成執行緒飢餓,進而影響整個系統的效能。所以,強烈建議你要根據不同的業務型別建立不同的執行緒池,以避免互相干擾

// 使用預設執行緒池
static CompletableFuture<Void> 
 runAsync(Runnable runnable)
static <U> CompletableFuture<U> 
 supplyAsync(Supplier<U> supplier)
// 可以指定執行緒池 
static CompletableFuture<Void> 
 runAsync(Runnable runnable,Executor executor)
static <U> CompletableFuture<U> 
 supplyAsync(Supplier<U> supplier,Executor executor) 

建立完 CompletableFuture 物件之後,會自動地非同步執行 runnable.run() 方法或者 supplier.get() 方法,對於一個非同步操作,你需要關注兩個問題:一個是非同步操作什麼時候結束,另一個是如何獲取非同步操作的執行結果。因為 CompletableFuture 類實現了 Future 介面,所以這兩個問題你都可以通過 Future 介面來解決。另外,CompletableFuture 類還實現了 CompletionStage 介面,這個介面內容實在是太豐富了,在 1.8 版本里有 40 個方法,這些方法我們該如何理解呢?

理解 CompletionStage 介面

可以站在分工的角度類比一下工作流。任務是有時序關係的,比如有序列關係、並行關係、匯聚關係等。這樣說可能有點抽象,這裡還舉前面燒水泡茶的例子,其中洗水壺和燒開水就是序列關係,洗水壺、燒開水和洗茶壺、洗茶杯這兩組任務之間就是並行關係,而燒開水、拿茶葉和泡茶就是匯聚關係。


序列關係


並行關係


匯聚關係

CompletionStage 介面可以清晰地描述任務之間的這種時序關係,例如前面提到的
f3 = f1.thenCombine(f2,()->{}) 描述的就是一種匯聚關係。燒水泡茶程式中的匯聚關係是一種 AND 聚合關係,這裡的 AND 指的是所有依賴的任務(燒開水和拿茶葉)都完成後才開始執行當前任務(泡茶)。既然有 AND 聚合關係,那就一定還有 OR 聚合關係,所謂 OR 指的是依賴的任務只要有一個完成就可以執行當前任務。

最後就是異常,CompletionStage 介面也可以方便地描述異常處理。

下面我們就來一一介紹,CompletionStage 介面如何描述序列關係、AND 聚合關係、OR 聚合關係以及異常處理。

1. 描述序列關係

CompletionStage 接口裡面描述序列關係,主要是 thenApply、thenAccept、thenRun 和 thenCompose 這四個系列的介面。

thenApply 系列函式裡引數 fn 的型別是介面 Function<T,R>,這個接口裡與 CompletionStage 相關的方法是R apply(T t),這個方法既能接收引數也支援返回值,所以 thenApply 系列方法返回的是CompletionStage<R>

而 thenAccept 系列方法裡引數 consumer 的型別是介面Consumer<T>,這個接口裡與 CompletionStage 相關的方法是void accept(T t),這個方法雖然支援引數,但卻不支援回值,所以 thenAccept 系列方法返回的是CompletionStage<Void>

thenRun 系列方法裡 action 的引數是 Runnable,所以 action 既不能接收引數也不支援返回值,所以 thenRun 系列方法返回的也是CompletionStage<Void>

這些方法裡面 Async 代表的是非同步執行 fn、consumer 或者 action。其中,需要你注意的是 thenCompose 系列方法,這個系列的方法會新創建出一個子流程,最終結果和 thenApply 系列是相同的。

CompletionStage<R> thenApply(fn);
CompletionStage<R> thenApplyAsync(fn);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);
CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);

通過下面的示例程式碼,你可以看一下 thenApply() 方法是如何使用的。首先通過 supplyAsync() 啟動一個非同步流程,之後是兩個序列操作,整體看起來還是挺簡單的。不過,雖然這是一個非同步流程,但任務①②③卻是序列執行的,②依賴①的執行結果,③依賴②的執行結果。

CompletableFuture<String> f0 = 
 CompletableFuture.supplyAsync(
  () -> "Hello World")   //①
 .thenApply(s -> s + " QQ") //②
 .thenApply(String::toUpperCase);//③

System.out.println(f0.join());
// 輸出結果
HELLO WORLD QQ

2. 描述 AND 匯聚關係

CompletionStage 接口裡面描述 AND 匯聚關係,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的介面,這些介面的區別也是源自 fn、consumer、action 這三個核心引數不同。

CompletionStage<R> thenCombine(other,fn);
CompletionStage<R> thenCombineAsync(other,fn);
CompletionStage<Void> thenAcceptBoth(other,consumer);
CompletionStage<Void> thenAcceptBothAsync(other,consumer);
CompletionStage<Void> runAfterBoth(other,action);
CompletionStage<Void> runAfterBothAsync(other,action);

3. 描述 OR 匯聚關係

CompletionStage 接口裡面描述 OR 匯聚關係,主要是 applyToEither、acceptEither 和 runAfterEither 系列的介面,這些介面的區別也是源自 fn、consumer、action 這三個核心引數不同。

CompletionStage applyToEither(other,fn);
CompletionStage applyToEitherAsync(other,fn);
CompletionStage acceptEither(other,consumer);
CompletionStage acceptEitherAsync(other,consumer);
CompletionStage runAfterEither(other,action);
CompletionStage runAfterEitherAsync(other,action);
CompletableFuture<String> f1 = 
 CompletableFuture.supplyAsync(()->{
  int t = getRandom(5,10);
  sleep(t,TimeUnit.SECONDS);
  return String.valueOf(t);
});

CompletableFuture<String> f2 = 
 CompletableFuture.supplyAsync(()->{
  int t = getRandom(5,TimeUnit.SECONDS);
  return String.valueOf(t);
});

CompletableFuture<String> f3 = 
 f1.applyToEither(f2,s -> s);

System.out.println(f3.join());

4. 異常處理

雖然上面我們提到的 fn、consumer、action 它們的核心方法都不允許丟擲可檢查異常,但是卻無法限制它們丟擲執行時異常 ,例如下面的程式碼,執行

CompletableFuture<Integer> 
 f0 = CompletableFuture.
  .supplyAsync(()->(7/0))
  .thenApply(r->r*10);
System.out.println(f0.join());

CompletionStage 介面給我們提供的方案非常簡單,比 try{}catch{}還要簡單,下面是相關的方法,使用這些方法進行異常處理和序列操作是一樣的,都支援鏈式程式設計方式。

CompletionStage exceptionally(fn);
CompletionStage<R> whenComplete(consumer);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);

下面的示例程式碼展示瞭如何使用 exceptionally() 方法來處理異常,exceptionally() 的使用非常類似於 try{}catch{}中的 catch{},但是由於支援鏈式程式設計方式,所以相對更簡單。

whenComplete() 和 handle() 系列方法就類似於 try{}finally{}中的 finally{},無論是否發生異常都會執行 whenComplete() 中的回撥函式 consumer 和 handle() 中的回撥函式 fn。

whenComplete() 和 handle() 的區別在於 whenComplete() 不支援返回結果,而 handle() 是支援返回結果的。

CompletableFuture<Integer> 
 f0 = CompletableFuture
  .supplyAsync(()->7/0))
  .thenApply(r->r*10)
  .exceptionally(e->0);
System.out.println(f0.join());

總結

不過最近幾年,伴隨著 ReactiveX 的發展(Java 語言的實現版本是 RxJava),回撥地獄已經被完美解決了,Java 語言也開始官方支援非同步程式設計:在 1.8 版本提供了 CompletableFuture,在 Java 9 版本則提供了更加完備的 Flow API,非同步程式設計目前已經完全工業化。

CompletableFuture 已經能夠滿足簡單的非同步程式設計需求,如果你對非同步程式設計感興趣,可以重點關注 RxJava 這個專案,利用 RxJava,即便在 Java 1.6 版本也能享受非同步程式設計的樂趣。

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。