Flux 和 Mono 、reactor實戰 (史上最全)
1. 前言
最近寫關於響應式程式設計的東西有點多,很多同學反映對Flux和Mono這兩個Reactor中的概念有點懵逼。但是目前Java響應式程式設計中我們對這兩個物件的接觸又最多,諸如Spring WebFlux、RSocket、R2DBC。我開始也對這兩個物件頭疼,所以今天我們就簡單來探討一下它們。
響應式程式設計概述
背景知識
為了應對高併發伺服器端開發場景,在2009 年,微軟提出了一個更優雅地實現非同步程式設計的方式——Reactive Programming,我們稱之為響應式程式設計。隨後,Netflix 和LightBend 公司提供了RxJava 和Akka Stream 等技術,使得Java 平臺也有了能夠實現響應式程式設計的框架。
在2017 年9 月28 日,Spring 5 正式釋出。Spring 5 釋出最大的意義在於,它將響應式程式設計技術的普及向前推進了一大步。而同時,作為在背後支援Spring 5 響應式程式設計的框架Spring Reactor,也進入了里程碑式的3.1.0 版本。
什麼是響應式程式設計
響應式程式設計是一種面向資料流和變化傳播的程式設計正規化。這意味著可以在程式語言中很方便地表達靜態或動態的資料流,而相關的計算模型會自動將變化的值通過資料流進行傳播。
響應式程式設計基於reactor(Reactor 是一個執行在 Java8 之上的響應式框架)的思想,當你做一個帶有一定延遲的才能夠返回的io操作時,不會阻塞,而是立刻返回一個流,並且訂閱這個流,當這個流上產生了返回資料,可以立刻得到通知並呼叫回撥函式處理資料。
電子表格程式就是響應式程式設計的一個例子。單元格可以包含字面值或類似"=B1+C1"的公式,而包含公式的單元格的值會依據其他單元格的值的變化而變化。
響應式傳播核心特點之一:變化傳播:一個單元格變化之後,會像多米諾骨牌一樣,導致直接和間接引用它的其他單元格均發生相應變化。
基於Java8實現觀察者模式
Observable類:此類表示可觀察物件,或模型檢視範例中的“資料”。
它可以被子類實現以表示應用程式想要觀察的物件。
//想要觀察的物件 ObserverDemo public class ObserverDemo extends Observable { public static void main(String[] args) { ObserverDemo observerDemo = new ObserverDemo(); //新增觀察者 observerDemo.addObserver((o,arg)->{ System.out.println("資料發生變化A"); }); observerDemo.addObserver((o,arg)->{ System.out.println("資料發生變化B"); }); observerDemo.setChanged();//將此Observable物件標記為已更改 observerDemo.notifyObservers();//如果該物件發生了變化,則通知其所有觀察者 } }
啟動程式測試:
建立一個Observable
rxjava中,可以使用Observable.create() 該方法接收一個Obsubscribe物件
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
}
});
來個例子:
Observable<Integer> observable=Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for(int i=0;i<5;i++){
subscriber.onNext(i);
}
subscriber.onCompleted();
}
});
//Observable.subscribe(Observer),Observer訂閱了Observable
Subscription subscribe = observable.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
Log.e(TAG, "完成");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "異常");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "接收Obsverable中發射的值:" + integer);
}
});
輸出:
接收Obsverable中發射的值:0
接收Obsverable中發射的值:1
接收Obsverable中發射的值:2
接收Obsverable中發射的值:3
接收Obsverable中發射的值:4
從上面的例子可以看出,在Observer訂閱了Observable後,
Observer作為OnSubscribe中call方法的引數傳入,從而呼叫了Observer的相關方法
基於 Reactor 實現
Reactor 是一個執行在 Java8 之上滿足 Reactice 規範的響應式框架,它提供了一組響應式風格的 API。
Reactor 有兩個核心類: Flux<T>
和 Mono<T>
,這兩個類都實現 Publisher 介面。
- Flux 類似 RxJava 的 Observable,它可以觸發零到多個事件,並根據實際情況結束處理或觸發錯誤。
- Mono 最多隻觸發一個事件,所以可以把 Mono 用於在非同步任務完成時發出通知。
Flux 和 Mono 都是資料流的釋出者,使用 Flux 和 Mono 都可以發出三種資料訊號:元素值,錯誤訊號,完成訊號;錯誤訊號和完成訊號都代表終止訊號,終止訊號用於告訴訂閱者資料流結束了,錯誤訊號終止資料流同時把錯誤資訊傳遞給訂閱者。
三種訊號的特點:
- 錯誤訊號和完成訊號都是終止訊號,不能共存
- 如果沒有傳送任何元素值,而是直接傳送錯誤或者完成訊號,表示是空資料流
- 如果沒有錯誤訊號,也沒有完成訊號,表示是無限資料流
引入依賴
<dependency>
<groupId>org.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>1.1.6.RELEASE</version>
</dependency>
just 和 subscribe方法
just():建立Flux序列,並宣告指定資料流
subscribe():訂閱Flux序列,只有進行訂閱後才回觸發資料流,不訂閱就什麼都不會發生
public class TestReactor {
public static void main(String[] args) {
//just():建立Flux序列,並宣告資料流,
Flux<Integer> integerFlux = Flux.just(1, 2, 3, 4);//整形
//subscribe():訂閱Flux序列,只有進行訂閱後才回觸發資料流,不訂閱就什麼都不會發生
integerFlux.subscribe(System.out::println);
Flux<String> stringFlux = Flux.just("hello", "world");//字串
stringFlux.subscribe(System.out::println);
//fromArray(),fromIterable()和fromStream():可以從一個數組、Iterable 物件或Stream 物件中建立Flux序列
Integer[] array = {1,2,3,4};
Flux.fromArray(array).subscribe(System.out::println);
List<Integer> integers = Arrays.asList(array);
Flux.fromIterable(integers).subscribe(System.out::println);
Stream<Integer> stream = integers.stream();
Flux.fromStream(stream).subscribe(System.out::println);
}
}
啟動測試:
響應流的特點
要搞清楚這兩個概念,必須說一下響應流規範。它是響應式程式設計的基石。他具有以下特點:
-
響應流必須是無阻塞的。
-
響應流必須是一個數據流。
-
它必須可以非同步執行。
-
並且它也應該能夠處理背壓。
-
即時響應性: 只要有可能, 系統就會及時地做出響應。 即時響應是可用性和實用性的基石, 而更加重要的是,即時響應意味著可以快速地檢測到問題並且有效地對其進行處理。 即時響應的系統專注於提供快速而一致的響應時間, 確立可靠的反饋上限, 以提供一致的服務質量。 這種一致的行為轉而將簡化錯誤處理、 建立終端使用者的信任並促使使用者與系統作進一步的互動。
-
回彈性:系統在出現失敗時依然保持即時響應性。 這不僅適用於高可用的、 任務關鍵型系統——任何不具備回彈性的系統都將會在發生失敗之後丟失即時響應性。 回彈性是通過複製、 遏制、 隔離以及委託來實現的。 失敗的擴散被遏制在了每個元件內部, 與其他元件相互隔離, 從而確保系統某部分的失敗不會危及整個系統,並能獨立恢復。 每個元件的恢復都被委託給了另一個(外部的)元件, 此外,在必要時可以通過複製來保證高可用性。 (因此)元件的客戶端不再承擔元件失敗的處理。
-
彈性: 系統在不斷變化的工作負載之下依然保持即時響應性。 反應式系統可以對輸入(負載)的速率變化做出反應,比如通過增加或者減少被分配用於服務這些輸入(負載)的資源。 這意味著設計上並沒有爭用點和中央瓶頸, 得以進行元件的分片或者複製, 並在它們之間分佈輸入(負載)。 通過提供相關的實時效能指標, 反應式系統能支援預測式以及反應式的伸縮演算法。 這些系統可以在常規的硬體以及軟體平臺上實現成本高效的彈性。
-
訊息驅動:反應式系統依賴非同步的訊息傳遞,從而確保了鬆耦合、隔離、位置透明的元件之間有著明確邊界。 這一邊界還提供了將失敗作為訊息委託出去的手段。 使用顯式的訊息傳遞,可以通過在系統中塑造並監視訊息流佇列, 並在必要時應用回壓, 從而實現負載管理、 彈性以及流量控制。 使用位置透明的訊息傳遞作為通訊的手段, 使得跨叢集或者在單個主機中使用相同的結構成分和語義來管理失敗成為了可能。 非阻塞的通訊使得接收者可以只在活動時才消耗資源, 從而減少系統開銷。
Publisher/Flux和Mono
由於響應流的特點,我們不能再返回一個簡單的POJO物件來表示結果了。必須返回一個類似Java中的Future
的概念,在有結果可用時通知消費者進行消費響應。
Reactive Stream規範中這種被定義為Publisher
PublisherSubscriber<? super T>
的需求推送元素。
一個Publisher
下面這個Excel計算就能說明一些Publisher
A1-A9就可以看做Publisher
A10-A13分別是求和函式SUM(A1:A9)
、平均函式AVERAGE(A1:A9)
、最大值函式MAX(A1:A9)
、最小值函式MIN(A1:A9)
,
A10-A13可以看作訂閱者Subscriber
。
假如說我們沒有A10-A13,那麼A1-A9就沒有實際意義,它們並不產生計算。
這也是響應式的一個重要特點:當沒有訂閱時釋出者什麼也不做。而Flux和Mono都是Publisher
Publishersubscribe
方法,允許消費者在有結果可用時進行消費。
如果沒有消費者Publisher
Publisher
Flux
Flux 是一個發出(emit)0-N
個元素組成的非同步序列的PublisheronComplete
訊號或者onError
訊號所終止。
在響應流規範中存在三種給下游消費者呼叫的方法 onNext
, onComplete
, 和onError
。下面這張圖表示了Flux的抽象模型:
以上的的講解對於初次接觸反應式程式設計的依然是難以理解的,所以這裡有一個循序漸進的理解過程。
有些類比並不是很妥當,但是對於你循序漸進的理解這些新概念還是有幫助的。
傳統資料處理
我們在平常是這麼寫的:
public List<ClientUser> allUsers() {
return Arrays.asList(new ClientUser("felord.cn", "reactive"),
new ClientUser("Felordcn", "Reactor"));
}
我們通過迭代返回值List
來get
這些元素進行再處理(消費),不管有沒有消費者, 菜品都會生產出來。
流式資料處理
在Java 8中我們可以改寫為流的表示:
public Stream<ClientUser> allUsers() {
return Stream.of(new ClientUser("felord.cn", "reactive"),
new ClientUser("Felordcn", "Reactor"));
}
反應式資料處理
在Reactor中我們又可以改寫為Flux表示:
public Flux<ClientUser> allUsers(){
return Flux.just(new ClientUser("felord.cn", "reactive"),
new ClientUser("Felordcn", "Reactor"));
}
這時候食客來了,發生了訂閱,廚師才開始做。
Flux 的建立Demo
Flux ints = Flux.range(1, 4);
Flux seq1 = Flux.just("bole1", "bole2", "bole3");
List iterable = Arrays.asList("bole_01", "bole_02", "bole_03");
Flux seq2 = Flux.fromIterable(iterable);
seq2.subscribe(i -> System.out.println(i));
Mono
Mono 是一個發出(emit)0-1
個元素的PublisheronComplete
訊號或者onError
訊號所終止。
mono 整體和Flux差不多,只不過這裡只會發出0-1個元素。也就是說不是有就是沒有。
象Flux一樣,我們來看看Mono的演化過程以幫助理解。
傳統資料處理
public ClientUser currentUser () {
return isAuthenticated ? new ClientUser("felord.cn", "reactive") : null;
}
直接返回符合條件的物件或者null`。
Optional的處理方式
public Optional<ClientUser> currentUser () {
return isAuthenticated ? Optional.of(new ClientUser("felord.cn", "reactive"))
: Optional.empty();
}
這個Optional我覺得就有反應式的那種味兒了,當然它並不是反應式。當我們不從返回值Optional取其中具體的物件時,我們不清楚裡面到底有沒有,但是Optional是一定客觀存在的,不會出現NPE問題。
反應式資料處理
public Mono<ClientUser> currentUser () {
return isAuthenticated ? Mono.just(new ClientUser("felord.cn", "reactive"))
: Mono.empty();
}
和Optional有點類似的機制,當然Mono不是為了解決NPE問題的,它是為了處理響應流中單個值(也可能是Void
)而存在的。
Mono的建立Demo
Mono data = Mono.just("bole");
Mono noData = Mono.empty();
m.subscribe(i -> System.out.println(i));
Flux和Mono總結
Flux和Mono是Java反應式中的重要概念,但是很多同學包括我在開始都難以理解它們。這其實是規定了兩種流式正規化,這種正規化讓資料具有一些新的特性,比如基於釋出訂閱的事件驅動,非同步流、背壓等等。另外資料是推送(Push)給消費者的以區別於平時我們的拉(Pull)模式。同時我們可以像Stream Api一樣使用類似map
、flatmap
等操作符(operator)來操作它們。
函式程式設計
反應式程式設計,常常和函數語言程式設計結合,這就是讓大家困擾的地方
函式程式設計介面
介面函式名 | 說明 |
---|---|
BiConsumer | 表示接收兩個輸入引數和不返回結果的操作。 |
BiFunction | 表示接受兩個引數,併產生一個結果的函式。 |
BinaryOperator | 表示在相同型別的兩個運算元的操作,生產相同型別的運算元的結果。 |
BiPredicate | 代表兩個引數謂詞(布林值函式)。 |
BooleanSupplier | 代表布林值結果的提供者。 |
Consumer | 表示接受一個輸入引數和不返回結果的操作。 |
DoubleBinaryOperator | 代表在兩個double值運算元的運算,併產生一個double值結果。 |
DoubleConsumer | 表示接受一個double值引數,不返回結果的操作。 |
DoubleFunction | 表示接受double值引數,併產生一個結果的函式。 |
DoublePredicate | 代表一個double值引數謂詞(布林值函式)。 |
DoubleSupplier | 表示表示接受double值引數,併產生一個結果的函式。值結果的提供者。 |
DoubleToIntFunction | 表示接受一個double值引數,不返回結果的操作。 |
DoubleFunction | 表示接受double值引數,併產生一個結果的函式。 |
DoublePredicate | 代表一個double值引數謂詞(布林值函式)。 |
DoubleSupplier | DoubleToIntFunction |
DoubleToIntFunction | 表示接受double值引數,併產生一個int值結果的函式。 |
DoubleToLongFunction | 表示上產生一個double值結果的單個double值運算元的操作。 |
Function | 代表接受一個double值引數,併產生一個long值結果的函式。 |
DoubleUnaryOperator | 表示上產生一個double值結果的單個double值運算元的操作。 |
Function | 表示接受一個引數,併產生一個結果的函式。 |
IntConsumer | 表示接受單個int值的引數並沒有返回結果的操作。 |
IntFunction | 表示接受一個int值引數,併產生一個結果的函式。 |
IntPredicate | 表示一個整數值引數謂詞(布林值函式)。 |
IntSupplier | 代表整型值的結果的提供者。 |
IntToLongFunction | 表示接受一個int值引數,併產生一個long值結果的函式。 |
IntUnaryOperator | 表示產生一個int值結果的單個int值運算元的運算。 |
LongBinaryOperator | 表示在兩個long值運算元的操作,併產生一個ObjLongConsumer值結果。 |
LongFunction | 表示接受long值引數,併產生一個結果的函式。 |
LongPredicate | 代表一個long值引數謂詞(布林值函式)。 |
LongSupplier | 表示long值結果的提供者。 |
LongToDoubleFunction | 表示接受double引數,併產生一個double值結果的函式。 |
LongToIntFunction | 表示接受long值引數,併產生一個int值結果的函式。 |
LongUnaryOperator | 表示上產生一個long值結果單一的long值運算元的操作。 |
ObjDoubleConsumer | 表示接受物件值和double值引數,並且沒有返回結果的操作。 |
ObjIntConsumer | 表示接受物件值和整型值引數,並返回沒有結果的操作。 |
ObjLongConsumer | 表示接受物件值和整型值引數,並返回沒有結果的操作。 |
ObjLongConsumer | 表示接受物件值和double值引數,並且沒有返回結果的操作。 |
ObjIntConsumer | 表示接受物件值和整型值引數,並返回沒有結果的操作。 |
ObjLongConsumer | 表示接受物件的值和long值的說法,並沒有返回結果的操作。 |
Predicate | 代表一個引數謂詞(布林值函式)。 |
Supplier | 表示一個提供者的結果。 |
ToDoubleBiFunction | 表示接受兩個引數,併產生一個double值結果的功能。 |
ToDoubleFunction | 代表一個產生一個double值結果的功能。 |
ToIntBiFunction | 表示接受兩個引數,併產生一個int值結果的函式。 |
ToIntFunction | 代表產生一個int值結果的功能。 |
ToLongBiFunction | 表示接受兩個引數,併產生long值結果的功能。 |
ToLongFunction | 代表一個產生long值結果的功能。 |
UnaryOperator | 表示上產生相同型別的運算元的結果的單個運算元的操作。 |
常用函式程式設計示例
Consumer 一個輸入 無輸出
Product product=new Product();
//類名+靜態方法 一個輸入T 沒有輸出
Consumer consumer1 = Product->Product.nameOf(product);//lambda
consumer1.accept(product);
Consumer consumer = Product::nameOf;//方法引用
consumer.accept(product);
Funtion<T,R> 一個輸入 一個輸出
//物件+方法 一個輸入T 一個輸出R
Function<Integer, Integer> function = product::reduceStock;
System.out.println("剩餘庫存:" + function.apply(10));
//帶引數的建構函式
Function<Integer,Product> function1=Product::new;
System.out.println("新物件:" +function1.apply(200));
Predicate 一個輸入T, 一個輸出 Boolean
//Predicate 一個輸入T 一個輸出Boolean
Predicate predicate= i -> product.isEnough(i);//lambda
System.out.println("庫存是否足夠:"+predicate.test(100));
Predicate predicate1= product::isEnough;//方法引用
System.out.println("庫存是否足夠:"+predicate1.test(100));
UnaryOperator 一元操作符 輸入輸出都是T
//一元操作符 輸入和輸出T
UnaryOperator integerUnaryOperator =product::reduceStock;
System.out.println("剩餘庫存:" + integerUnaryOperator.apply(20));
IntUnaryOperator intUnaryOperator = product::reduceStock;
System.out.println("剩餘庫存:" + intUnaryOperator.applyAsInt(30));
Supplier 沒有輸入 只有輸出
//無引數建構函式
Supplier supplier = Product::new;
System.out.println("建立新物件:" + supplier.get());
Supplier supplier1=()->product.getStock();
System.out.println("剩餘庫存:" + supplier1.get());
BiFunction 二元操作符 兩個輸入<T,U> 一個輸出
//類名+方法
BiFunction<Product, Integer, Integer> binaryOperator = Product::reduceStock;
System.out.println(" 剩餘庫存(BiFunction):" + binaryOperator.apply(product, 10));
BinaryOperator 二元操作符 ,二個輸入 一個輸出
//BinaryOperator binaryOperator1=(x,y)->product.reduceStock(x,y);
BinaryOperator binaryOperator1=product::reduceStock;
System.out.println(" 剩餘庫存(BinaryOperator):" +binaryOperator1.apply(product.getStock(),10));
Flux類中的靜態方法:
簡單的建立方法
just():
可以指定序列中包含的全部元素。創建出來的Flux序列在釋出這些元素之後會自動結束
fromArray(),fromIterable(),fromStream():
可以從一個數組,Iterable物件或Stream物件中穿件Flux物件
empty():
建立一個不包含任何元素,只發布結束訊息的序列
error(Throwable error):
建立一個只包含錯誤訊息的序列
never():
傳建一個不包含任務訊息通知的序列
range(int start, int count):
建立包含從start起始的count個數量的Integer物件的序列
interval(Duration period)和interval(Duration delay, Duration period):
建立一個包含了從0開始遞增的Long物件的序列。其中包含的元素按照指定的間隔來發布。除了間隔時間之外,還可以指定起始元素髮布之前的延遲時間
intervalMillis(long period)和intervalMillis(long delay, long period):
與interval()方法相同,但該方法通過毫秒數來指定時間間隔和延遲時間
例子
Flux.just("Hello", "World").subscribe(System.out::println);
Flux.fromArray(new Integer[]{1, 2, 3}).subscribe(System.out::println);
Flux.empty().subscribe(System.out::println);
Flux.range(1, 10).subscribe(System.out::println);
Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
Flux.intervalMillis(1000).subscirbe(System.out::println);
複雜的序列建立 generate()
當序列的生成需要複雜的邏輯時,則應該使用generate()或create()方法。
generate()方法通過同步和逐一的方式來產生Flux序列。
序列的產生是通過呼叫所提供的的SynchronousSink物件的next(),complete()和error(Throwable)方法來完成的。
逐一生成的含義是在具體的生成邏輯中,next()方法只能最多被呼叫一次。
在某些情況下,序列的生成可能是有狀態的,需要用到某些狀態物件,此時可以使用
generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator),
其中stateSupplier用來提供初始的狀態物件。
在進行序列生成時,狀態物件會作為generator使用的第一個引數傳入,可以在對應的邏輯中對改狀態物件進行修改以供下一次生成時使用。
Flux.generate(sink -> {
sink.next("Hello");
sink.complete();
}).subscribe(System.out::println);
final Random random = new Random();
Flux.generate(ArrayList::new, (list, sink) -> {
int value = random.nextInt(100);
list.add(value);
sink.next(value);
if( list.size() ==10 )
sink.complete();
return list;
}).subscribe(System.out::println);
複雜的序列建立 create()
create()方法與generate()方法的不同之處在於所使用的是FluxSink物件。
FluxSink支援同步和非同步的訊息產生,並且可以在一次呼叫中產生多個元素。
Flux.create(sink -> {
for(int i = 0; i < 10; i ++)
sink.next(i);
sink.complete();
}).subscribe(System.out::println);
Mono靜態方法
Mono類包含了與Flux類中相同的靜態方法:just(),empty()和never()等。
除此之外,Mono還有一些獨有的靜態方法:
fromCallable(),fromCompletionStage(),fromFuture(),fromRunnable()和fromSupplier():分別從Callable,CompletionStage,CompletableFuture,Runnable和Supplier中建立Mono
delay(Duration duration)和delayMillis(long duration):建立一個Mono序列,在指定的延遲時間之後,產生數字0作為唯一值
ignoreElements(Publisher
justOrEmpty(Optional<? extends T> data)和justOrEmpty(T data):從一個Optional物件或可能為null的物件中建立Mono。只有Optional物件中包含之或物件不為null時,Mono序列才產生對應的元素
Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);
Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);
Mono.create(sink -> sink.success("Hello")).subscribte(System.out::println);
操作符
操作符buffer和bufferTimeout
這兩個操作符的作用是把當前流中的元素收集到集合中,並把集合物件作為流中的新元素。
在進行收集時可以指定不同的條件:所包含的元素的最大數量或收集的時間間隔。方法buffer()僅使用一個條件,而bufferTimeout()可以同時指定兩個條件。
指定時間間隔時可以使用Duration物件或毫秒數,即使用bufferMillis()或bufferTimeoutMillis()兩個方法。
除了元素數量和時間間隔外,還可以通過bufferUntil和bufferWhile操作符來進行收集。這兩個操作符的引數時表示每個集合中的元素索要滿足的條件的Predicate物件。
bufferUntil會一直收集直到Predicate返回true。
使得Predicate返回true的那個元素可以選擇新增到當前集合或下一個集合中;bufferWhile則只有當Predicate返回true時才會收集。一旦為false,會立即開始下一次收集。
Flux.range(1, 100).buffer(20).subscribe(System.out::println);
Flux.intervalMillis(100).bufferMillis(1001).take(2).toStream().forEach(System.out::println);
Flux.range(1, 10).bufferUntil(i -> i%2 == 0).subscribe(System.out::println);
Flux.range(1, 10).bufferWhile(i -> i%2 == 0).subscribe(System.out::println);
操作符Filter
對流中包含的元素進行過濾,只留下滿足Predicate指定條件的元素。
Flux.range(1, 10).filter(i -> i%2 == 0).subscribe(System.out::println);
操作符zipWith
zipWith操作符把當前流中的元素與另一個流中的元素按照一對一的方式進行合併。在合併時可以不做任何處理,由此得到的是一個元素型別為Tuple2的流;也可以通過一個BiFunction函式對合並的元素進行處理,所得到的流的元素型別為該函式的返回值。
Flux.just("a", "b").zipWith(Flux.just("c", "d")).subscribe(System.out::println);
Flux.just("a", "b").zipWith(Flux.just("c", "d"), (s1, s2) -> String.format("%s-%s", s1, s2)).subscribe(System.out::println);
操作符take
take系列操作符用來從當前流中提取元素。提取方式如下:
take(long n),take(Duration timespan)和takeMillis(long timespan):按照指定的數量或時間間隔來提取
takeLast(long n):提取流中的最後N個元素
takeUntil(Predicate<? super T> predicate) :提取元素直到Predicate返回true
takeWhile(Predicate<? super T> continuePredicate):當Predicate返回true時才進行提取
takeUntilOther(Publisher<?> other):提取元素知道另外一個流開始產生元素
Flux.range(1, 1000).take(10).subscribe(System.out::println);
Flux.range(1, 1000).takeLast(10).subscribe(System.out::println);
Flux.range(1, 1000).takeWhile(i -> i < 10).subscribe(System.out::println);
Flux.range(1, 1000).takeUntil(i -> i == 10).subscribe(System.out::println);
操作符reduce和reduceWith
reduce和reduceWith操作符對流中包含的所有元素進行累計操作,得到一個包含計算結果的Mono序列。累計操作是通過一個BiFunction來表示的。在操作時可以指定一個初始值。若沒有初始值,則序列的第一個元素作為初始值。
Flux.range(1, 100).reduce((x, y) -> x + y).subscribe(System.out::println);
Flux.range(1, 100).reduceWith(() -> 100, (x + y) -> x + y).subscribe(System.out::println);
操作符merge和mergeSequential
merge和mergeSequential操作符用來把多個流合併成一個Flux序列。merge按照所有流中元素的實際產生序列來合併,而mergeSequential按照所有流被訂閱的順序,以流為單位進行合併。
Flux.merge(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5)).toStream().forEach(System.out::println);
Flux.mergeSequential(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5)).toStream().forEach(System.out::println);
操作符flatMap和flatMapSequential
flatMap和flatMapSequential操作符把流中的每個元素轉換成一個流,再把所有流中的元素進行合併。flatMapSequential和flatMap之間的區別與mergeSequential和merge是一樣的。
Flux.just(5, 10).flatMap(x -> Flux.intervalMillis(x * 10, 100).take(x)).toStream().forEach(System.out::println);
操作符concatMap
concatMap操作符的作用也是把流中的每個元素轉換成一個流,再把所有流進行合併。concatMap會根據原始流中的元素順序依次把轉換之後的流進行合併,並且concatMap堆轉換之後的流的訂閱是動態進行的,而flatMapSequential在合併之前就已經訂閱了所有的流。
Flux.just(5, 10).concatMap(x -> Flux.intervalMillis(x * 10, 100).take(x)).toStream().forEach(System.out::println);
操作符combineLatest
combineLatest操作符把所有流中的最新產生的元素合併成一個新的元素,作為返回結果流中的元素。只要其中任何一個流中產生了新的元素,合併操作就會被執行一次,結果流中就會產生新的元素。
Flux.combineLatest(Arrays::toString, Flux.intervalMillis(100).take(5), Flux.intervalMillis(50, 100).take(5)).toStream().forEach(System.out::println);
訊息處理
當需要處理Flux或Mono中的訊息時,可以通過subscribe方法來新增相應的訂閱邏輯。
在呼叫subscribe方法時可以指定需要處理的訊息型別。
Flux.just(1, 2).concatWith(Mono.error(new IllegalStateException())).subscribe(System.out::println, System.err::println);
Flux.just(1, 2).concatWith(Mono.error(new IllegalStateException())).onErrorReturn(0).subscribe(System.out::println);
第2種可以通過switchOnError()方法來使用另外的流來產生元素。
Flux.just(1, 2).concatWith(Mono.error(new IllegalStateException())).switchOnError(Mono.just(0)).subscribe(System.out::println);
第三種是通過onErrorResumeWith()方法來根據不同的異常型別來選擇要使用的產生元素的流。
Flux.just(1, 2).concatWith(Mono.error(new IllegalArgumentException())).onErrorResumeWith(e -> {
if(e instanceof IllegalStateException)
return Mono.just(0);
else if(e instanceof IllegalArgumentException)
return Mono.just(-1);
return Mono.epmty();
}).subscribe(System,.out::println);
當出現錯誤時還可以使用retry操作符來進行重試。重試的動作是通過重新訂閱序列來實現的。在使用retry操作時還可以指定重試的次數。
Flux.just(1, 2).concatWith(Mono.error(new IllegalStateException())).retry(1).subscrible(System.out::println);
排程器Scheduler
通過排程器可以指定操作執行的方式和所在的執行緒。有以下幾種不同的排程器實現
當前執行緒,通過Schedulers.immediate()方法來建立
單一的可複用的執行緒,通過Schedulers.single()方法來建立
使用彈性的執行緒池,通過Schedulers.elastic()方法來建立。執行緒池中的執行緒是可以複用的。當所需要時,新的執行緒會被建立。若一個執行緒閒置時間太長,則會被銷燬。該排程器適用於I/O操作相關的流的處理
使用對並行操作優化的執行緒池,通過Schedulers.parallel()方法來建立。其中的執行緒數量取決於CPU的核的數量。該排程器適用於計算密集型的流的處理
使用支援任務排程的排程器,通過Schedulers.timer()方法來建立
從已有的ExecutorService物件中建立排程器,通過Schedulers.fromExecutorService()方法來建立
通過publishOn()和subscribeOn()方法可以切換執行操作排程器。publishOn()方法切換的是操作符的執行方式,而subscribeOn()方法切換的是產生流中元素時的執行方式
Flux.create(sink -> {
sink.next(Thread.currentThread().getName());
sink.complete();
}).publishOn(Schedulers.single())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.publishOn(Schedulers.elastic())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.subscribeOn(Schedulers.parallel())
.toStream()
.forEach(System.out::println);
測試
StepVerifier的作用是可以對序列中包含的元素進行逐一驗證。通過StepVerifier.create()方法對一個流進行包裝之後再進行驗證。expectNext()方法用來宣告測試時所期待的流中的下一個元素的值,而verifyComplete()方法則驗證流是否正常結束。verifyError()來驗證流由於錯誤而終止。
StepVerifier.create(Flux.just(a, b)).expectNext("a").expectNext("b").verifyComplete();
使用StepVerifier.withVirtualTime()方法可以創建出使用虛擬時鐘的SteoVerifier。通過thenAwait(Duration)方法可以讓虛擬時鐘前進。
StepVerifier.withVirtualTime(() -> Flux.interval(Duration.ofHours(4), Duration.ofDays(1)).take(2))
.expectSubscription()
.expectNoEvent(Duration.ofHours(4))
.expectNext(0L)
.thenAwait(Duration.ofDays(1))
.expectNext(1L)
.verifyComplete();
TestPublisher的作用在於可以控制流中元素的產生,甚至是違反反應流規範的情況。通過create()方法建立一個新的TestPublisher物件,然後使用next()方法來產生元素,使用complete()方法來結束流。
final TestPublisher<String> testPublisher = TestPublisher.creater();
testPublisher.next("a");
testPublisher.next("b");
testPublisher.complete();
StepVerifier.create(testPublisher)
.expectNext("a")
.expectNext("b")
.expectComplete();
除錯
在除錯模式啟用之後,所有的操作符在執行時都會儲存額外的與執行鏈相關的資訊。當出現錯誤時,這些資訊會被作為異常堆疊資訊的一部分輸出。
Hooks.onOperator(providedHook -> providedHook.operatorStacktrace());
也可以通過checkpoint操作符來對特定的流處理鏈來啟用除錯模式。
Flux.just(1, 0).map(x -> 1/x).checkpoint("test").subscribe(System.out::println);
日誌記錄
可以通過新增log操作把流相關的事件記錄在日誌中,
Flux.range(1, 2).log("Range").subscribe(System.out::println);
冷熱序列
冷序列的含義是不論訂閱者在何時訂閱該序列,總是能收到序列中產生的全部訊息。熱序列是在持續不斷的產生訊息,訂閱者只能獲取到在其訂閱之後產生的訊息。
final Flux<Long> source = Flux.intervalMillis(1000).take(10).publish.autoConnect();
source.subscribe();
Thread.sleep(5000);
source.toStream().forEach(System.out::println);
參考文獻
https://blog.csdn.net/wpc2018/article/details/122634049
https://www.jianshu.com/p/7d80b94068b3
https://blog.csdn.net/yhj_911/article/details/119540000
http://bjqianye.cn/detail/6845.html