1. 程式人生 > 程式設計 >CompletableFuture非同步程式設計

CompletableFuture非同步程式設計

CompletableFuture 有什麼用

  • CompletableFuture是用來描述多執行緒任務的時序關係的:序列關係,並行關係,聚合關係。
  • CompletableFuture 是Java 8 新增加的Api,該類實現,Future和CompletionStage兩個介面,提供了非常強大的Future的擴充套件功能,可以幫助我們簡化非同步程式設計的複雜性,提供了函式語言程式設計的能力,可以通過回撥的方式處理計算結果,並且提供了轉換和組合CompletableFuture的方法。
建立CompletableFuture物件

方式一:使用預設執行緒池

/**
 * 建立一個不帶返回值得任務。
 */
CompletableFuture<Void> f1 = CompletableFuture.runAsync(new Runnable() {
     @Override
     public void run() {
      //業務邏輯          
     }
 });
 /**
  * 建立一個帶返回值的任務。
  */
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(new Supplier<String>() {
    @Override
    public String get() {
        //業務邏輯
        return null;
    }
});複製程式碼

方式二:使用自定義執行緒池(建議使用)

 //建立執行緒池
ExecutorService    executor =    Executors.newFixedThreadPool(10); 
        
//建立一個不帶返回值得任務。
CompletableFuture<Void> f1 = CompletableFuture.runAsync(new Runnable() {
    @Override
    public void run() {

    }
},executor);
//建立一個帶返回值的任務。
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(new Supplier<String>() {
    @Override
    public String get() {
        //業務邏輯
        return null;
    }
},executor);    複製程式碼

  1. 預設情況下CompletableFuture會使用公共的ForkJoinPool執行緒池,這個執行緒池預設建立的執行緒數是CPU的 核數(也可以通過JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism來設定ForkJoinPool 執行緒池的執行緒數)。如果所有CompletableFuture共享一個執行緒池,那麼一旦有任務執行一些很慢的I/O操 作,就會導致執行緒池中所有執行緒都阻塞在I/O操作上,從而造成執行緒飢餓,進而影響整個系統的效能。所以,強烈建議你要根據不同的業務型別建立不同的執行緒池,以避免互相干擾。
  2. 建立完CompletableFuture物件之後,會自動地非同步執行runnable.run()方法或者supplier.get()方法。因為CompletableFuture類實現了Future介面,所以這兩個問題你都可以通過Future介面來解決。另外,CompletableFuture類還實現了CompletionStage介面。

常用API

  • public T get():獲取計算結果, 該方法為阻塞方法會等待計算結果完成。
  • public T get(long timeout,TimeUnit unit):有時間限制的阻塞方法
  • public T getNow(T valueIfAbsent)立即獲取方法結果,如果沒有計算結束則返回傳的值
  • public T join()和 get() 方法類似也是主動阻塞執行緒,等待計算結果。和get() 方法有細微的差別。
  • public boolean complete(T value):立即完成計算,並把結果設定為傳的值,返回是否設定成功
  • public boolean completeExceptionally(Throwable ex):立即完成計算,並丟擲異常

理解CompletionStage介面

CompletionStage介面可以清晰地描述任務之間的這種時序關係,時序關係:序列,並行,匯聚。

序列

執行緒與執行緒之間的執行順序是序列的。

//Async代表的是非同步執行fn
CompletionStage<R>    thenApply(fn); 
CompletionStage<R>    thenApplyAsync(fn); 
CompletionStage<Void>    thenAccept(consumer); 
CompletionStage<Void>    thenAcceptAsync(consumer); 
CompletionStage<Void>    thenRun(action); 
CompletionStage<Void>    thenRunAsync(action); 
CompletionStage<R>    thenCompose(fn); 
CompletionStage<R>    thenComposeAsync(fn);複製程式碼

  • CompletionStage介面裡面描述序列關係,主要是thenApply、thenAccept、thenRun和thenCompose這四 個系列的介面。
    • thenApply系列函式裡引數fn的型別是介面Function,這個介面裡與CompletionStage相關的方法是 R apply(T t),這個方法既能接收引數也支援返回值,所以thenApply系列方法返回的 是CompletionStage,>
  • 而thenAccept系列方法裡引數consumer的型別是介面Consumer,這個介面裡與CompletionStage相關 的方法是 void accept(T t),這個方法雖然支援引數,但卻不支援回值,所以thenAccept系列方法返回 的是CompletionStage
    • thenRun系列方法裡action的引數是Runnable,所以action既不能接收引數也不支援返回值,所以thenRun 系列方法返回的也是CompletionStage
    • 這些方法裡面Async代表的是非同步執行fn、consumer或者action。其中,需要你注意的是thenCompose系列 方法,這個系列的方法會新創建出一個子流程,最終結果和thenApply系列是相同的。

演示序列

//supplyAsync()啟動一個非同步 流程
CompletableFuture<String> f0 = CompletableFuture.supplyAsync(
    () -> "Hello World")                        //①        
    .thenApply(s ->    s + "girl")        //②
    .thenApply(String::toUpperCase);//③
System.out.println(f0.join());
//輸出結果 HELLO WORLD    girl複製程式碼

雖然這是一個非同步流程,但任務①②③卻是 序列執行的,②依賴①的執行結果,③依賴②的執行結果。

AND匯聚

CompletionStage介面裡面描述AND匯聚關係,主要是thenCombine、thenAcceptBoth和runAfterBoth系列的介面,這些介面的區別也是源自fn、consumer、action這三個核心引數不同。

CompletionStage<R>    thenCombine(other,fn); 
CompletionStage<R>    thenCombineAsync(other,fn); 
CompletionStage<Void>    thenAcceptBoth(other,consumer); CompletionStage<Void>    thenAcceptBothAsync(other,consumer); CompletionStage<Void>    runAfterBoth(other,action); 
CompletionStage<Void>    runAfterBothAsync(other,action);
複製程式碼

演示:

         // 啟動一個非同步流程f1
        CompletableFuture<Void> f1 = CompletableFuture.runAsync(()->{
            System.out.println("非同步任務-1");

        });
        // 啟動一個非同步流程f2
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{
            String s = "非同步任務-2";
            System.out.println(s);
            return    s;

        });
        //啟動一個匯聚流程f3
        CompletableFuture<String> f3 = f1.thenCombine(f2,(a,tf)->{
            return tf;//tf是f2的值
        });

        //等待任務3執行結果
        System.out.println(f3.join());複製程式碼

OR匯聚

CompletionStage介面裡面描述OR匯聚關係,主要是applyToEither、acceptEither和runAfterEither系列的 介面,這些介面的區別也是源自fn、consumer、action這三個核心引數不同。

CompletionStage    applyToEither(other,fn);
CompletionStage    applyToEitherAsync(other,fn);
CompletionStage    acceptEither(other,consumer);
CompletionStage    acceptEitherAsync(other,consumer);
CompletionStage    runAfterEither(other,action); 
CompletionStage    runAfterEitherAsync(other,action);
複製程式碼

功能演示:

         // 啟動一個非同步流程f1
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(()->{
            int    t    =     new Random().nextInt(10);
            System.out.println("f1-t = "+t);
            sleep(t,TimeUnit.SECONDS);
            return    String.valueOf(t);
        });
     // 啟動一個非同步流程f2
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{
            int    t    =     new Random().nextInt(10);
            System.out.println("f2-t = "+t);
            sleep(t,TimeUnit.SECONDS);
            return    String.valueOf(t);
        });
        //將f1和f2的值彙總到f3
        CompletableFuture<String> f3 = f1.applyToEither(f2,s    ->
                    Integer.parseInt(f2.join())+Integer.parseInt(s)+""
        );
        System.out.println(f3.join());複製程式碼

碼字不易如果對你有幫助請給個關注

愛技術愛生活 QQ群: 894109590