RxJava實現原理分析part2-資料處理
技術標籤:開源框架分析rxjava3backpressure
先來一個Jack大神的演講視訊
Managing the Reactive World with RxJava
Backpressue
Backpressue是指一個Flowable管道處理中,有些非同步狀態無法及時處理產出的值,需要一種方式來告訴upstream降低生產資料的節奏,本質上是一種flow control策略。
如下例子,source產出了過多的資料,會導致downstream處理不及時而導致問題
PublishProcessor<Integer> source = PublishProcessor.create(); source .observeOn(Schedulers.computation()) .subscribe(v -> compute(v), Throwable::printStackTrace); for (int i = 0; i < 1_000_000; i++) { source.onNext(i); } Thread.sleep(10_000);
使用如下寫法,則不會有任何問題
Flowable.range(1, 1_000_000)
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);
Thread.sleep(10_000);
在Rxjava3裡面,提供了5種處理策略,定義在BackpressureStrategy裡面
- MISSING onNext事件沒有任何buffer快取,或丟棄,downstream必須處理這些overflow.適用場景:
- ERROR 傳送一個MissingBackpressureException以防止downstream無法繼續處理
- BUFFER 快取所有onNext值到downstream消費,預設Buffer大小是128,可以修改
- DROP 如果downstream無法處理,丟棄最近的onNext事件
- LATEST 僅儲存最新的onNext值,如果downstream無法處理,則覆寫先前的值
具體實現的類圖結構如下:
onBackpressureXXX操作
大部分開發者遇到backpressure是當應用出問題丟擲MissingBackpressureException異常,而且異常通常指向於observeOn操作上。
實際的原因是使用了不支援backpressure的PublishProcessor、timer()、interval、或者通過create自定義的操作
有幾種方式可以處理此場景:
- 增加buffer的大小
有時overflow是因為快速的事件產生。使用者突然快速點選螢幕,observeOn在android預設大小是16
在最新的Rxjava裡面,開發者可以顯式的指定快取大小,相關的引數
Flowable.range(1, 1_000_000)
.onBackpressureBuffer(1024)
然而在通常場景下,只設置一個臨時的大小也會出現overflow的情況,當源資料生產了超出預測快取大小時,在這種場景下,可以使用如下操作
- onBackPressureBuffer系列方法
傳入的引數都不同,最終都是建立FlowableOnBackpressureBuffer
- final int bufferSize 快取大小
- final boolean unbounded 如果為true,表示快取大小被解釋為內部無界快取的island值
- final boolean delayError 如果為true,表示一個Flowable裡面的exception會延遲傳送,直到快取裡面的所有元素都被downstream消費掉;為false時,會直接將Excetion通知到downstream,忽略快取的資料
- final Action onOverflow 當一個事件需要被快取,但是沒有空間的時候,會執行此aciton
其中unbounded生效的場景為
if (unbounded) {
q = new SpscLinkedArrayQueue<>(bufferSize); // 可以調整大小
} else {
q = new SpscArrayQueue<>(bufferSize); // 大小固定
}
- onBackPressureDrop
如果downstream還沒有準備好接收新的事件(通過Subscription.request呼叫的缺失來表示);從當前Flowable裡面丟棄.
如果downstream request為0,則由此產生的Flowable將不呼叫onNext,直到Subscriber再次呼叫request(n)來增加請求計數。
引數Consumer<? super T> onDrop 表示針對每個drop的事件,都會執行此action,應該快速且非阻塞
對應的實現策略在FlowableOnBackpressureDrop檔案裡面
- onBackpressureLatest
如果downstream還沒有準備好接收新的事件(通過Subscription.request呼叫的缺失來表示);從當前Flowable裡面丟棄除最新之外的事件,當downstream準備好的時候,把最新的事件發給downstream
具體的實現是在FlowableOnBackpressureLatest類裡面
backpressured 資料來源的建立方式
- **Flowable.just **
返回一個Flowable,然後把給定的常量引用事件傳送出去,然後結束。注意這個事件被獲取,然後重新發送,just不會改這個事件。使用fromCallable近需來生成單個事件
一個簡單的例子:
Flowable.just(5).subscribe(new DisposableSubscriber<Integer>() {
@Override
public void onStart() {
request(2);
}
@Override
public void onNext(Integer v) {
System.out.println(v);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
}
輸出為如下,request裡面的數字沒有影響到onComplete的呼叫:
5
onComplete
具體實現類為FlowableJust
@Override
public void request(long n) {
// n 要求大於等於0
if (!SubscriptionHelper.validate(n)) {
return;
}
if (compareAndSet(NO_REQUEST, REQUESTED)) {
Subscriber<? super T> s = subscriber;
// 把事件傳送給訂閱者
s.onNext(value);
if (get() != CANCELLED) {
// 傳送完成事件
s.onComplete();
}
}
}
當just方法裡面有多個引數的時候,內部實現是通過formarray
public static <@NonNull T> Flowable<T> just(T item1, T item2) {
Objects.requireNonNull(item1, "item1 is null");
Objects.requireNonNull(item2, "item2 is null");
return fromArray(item1, item2);
}
- Flowable.fromCallable
返回一個Flowable,當一個Subscriber訂閱的時候,會呼叫指定的Callable,然後返回Callable的執行結果並作為事件傳送出去
此方法可以指定直到一個Subscriber訂閱Publisher時,才執行特定的方法,換句話說,可以延遲載入
Flowable<Integer> o = Flowable.fromCallable(() -> computeValue());
具體實現類為FlowableFromCallable
public void subscribeActual(Subscriber<? super T> s) {
DeferredScalarSubscription<T> deferred = new DeferredScalarSubscription<>(s);
s.onSubscribe(deferred);
T t;
try {
// 執行callable.call方法
t = Objects.requireNonNull(callable.call(), "The callable returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
if (deferred.isCancelled()) {
RxJavaPlugins.onError(ex);
} else {
s.onError(ex);
}
return;
}
// 將執行結果傳送出去
deferred.complete(t);
}
public final void complete(T v) {
int state = get();
for (;;) {
if (state == FUSED_EMPTY) {
value = v;
lazySet(FUSED_READY);
Subscriber<? super T> a = downstream;
// 把事件傳送給訂閱者
a.onNext(v);
if (get() != CANCELLED) {
// 傳送完成事件
a.onComplete();
}
return;
}
// if state is >= CANCELLED or bit zero is set (*_HAS_VALUE) case, return
if ((state & ~HAS_REQUEST_NO_VALUE) != 0) {
return;
}
if (state == HAS_REQUEST_NO_VALUE) {
lazySet(HAS_REQUEST_HAS_VALUE);
Subscriber<? super T> a = downstream;
a.onNext(v);
if (get() != CANCELLED) {
a.onComplete();
}
return;
}
value = v;
if (compareAndSet(NO_REQUEST_NO_VALUE, NO_REQUEST_HAS_VALUE)) {
return;
}
state = get();
if (state == CANCELLED) {
value = null;
return;
}
}
}
- Flowable.fromArray
把一個數組轉換成一個Publisher,然後傳送數組裡面的每個元素,當陣列長度為1時,等效於just方法
public static <@NonNull T> Flowable<T> fromArray(@NonNull T... items) {
Objects.requireNonNull(items, "items is null");
if (items.length == 0) {
return empty();
}
if (items.length == 1) {
return just(items[0]);
}
return RxJavaPlugins.onAssembly(new FlowableFromArray<>(items));
}
一個簡單的例子
Flowable.fromArray(1, 2, 3, 4, 5).subscribe(System.out::println);
具體實現類為FlowableFromArray
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
if (BackpressureHelper.add(this, n) == 0L) {
if (n == Long.MAX_VALUE) {
fastPath();
} else {
slowPath(n);
}
}
}
}
void slowPath(long r) {
long e = 0;
T[] arr = array;
int f = arr.length;
int i = index;
Subscriber<? super T> a = downstream;
for (;;) {
while (e != r && i != f) {
if (cancelled) {
return;
}
T t = arr[i];
if (t == null) {
a.onError(new NullPointerException("The element at index " + i + " is null"));
return;
} else {
// 迴圈傳送事件給downstream
a.onNext(t);
}
e++;
i++;
}
if (i == f) {
if (!cancelled) {
// 陣列事件都發送完後,
a.onComplete();
}
return;
}
r = get();
if (e == r) {
index = i;
r = addAndGet(-e);
if (r == 0L) {
return;
}
e = 0L;
}
}
}
fromIterable 實現fromarrsy類似,FlowableFromIterable可以用來實現狀態機
Iterable<Integer> iterable = () -> new Iterator<Integer>() {
@Override
public boolean hasNext() {
return true;
}
@Override
public Integer next() {
return 1;
}
};
Flowable.fromIterable(iterable).take(5).subscribe(System.out::println);
- Flowable.generate
有時,資料來源是同步的(blocking)和push的方式,被轉換成響應式時,我們呼叫get或read方法來獲取下資料來源中的下一塊。當然,可以使用Iterable,但是當源和某些資源關聯時,當downstream在資源關閉前停止訂閱的時, 可能會導致資源洩露
在RxJava裡面,處理這些場景,提供了generate工廠方法族
通常,generate使用3個callback
- 第一個callback 使用Supplier為每個Subscriber建立一個初始化狀態
- 第二個callback 獲取state物件,然後提供一個輸出Observer,onXXX方法可以被呼叫來發送事件。這個回撥會執行downstream的request,最多隻能呼叫onNext一次,可選後跟onError或onComplete
- 第三個callable,處理downstream,即當前generator結束序列或被取消時,Consumer來處理後續的
方法的定義如下:
* @param initialState the {@link Supplier} to generate the initial state for each {@code Subscriber}
* @param generator the {@link Consumer} called with the current state whenever a particular downstream {@code Subscriber} has
* requested a value. The callback then should call {@code onNext}, {@code onError} or
* {@code onComplete} to signal a value or a terminal event. Signaling multiple {@code onNext}
* in a call will make the operator signal {@link IllegalStateException}.
* @param disposeState the {@code Consumer} that is called with the current state when the generator
* terminates the sequence or it gets canceled
public static <T, S> Flowable<T> generate(@NonNull Supplier<S> initialState, @NonNull BiConsumer<S, Emitter<T>> generator,
@NonNull Consumer<? super S> disposeState) {
Objects.requireNonNull(generator, "generator is null");
return generate(initialState, FlowableInternalHelper.simpleBiGenerator(generator), disposeState);
}
unsubscribe事件或前一個callback的結束事件,可以在這個裡面釋放資源
一個簡單的例子
Flowable<Integer> o = Flowable.generate(
() -> new FileInputStream("data.bin"),
(inputstream, output) -> {
try {
int abyte = inputstream.read();
if (abyte < 0) {
output.onComplete();
} else {
output.onNext(abyte);
}
} catch (IOException ex) {
output.onError(ex);
}
return inputstream;
},
inputstream -> {
try {
inputstream.close();
} catch (IOException ex) {
RxJavaPlugins.onError(ex);
}
}
);
對應的實現類是FlowableGenerate
從JVM和lib庫裡面很多方法呼叫,會跑出checked exception,需要用try/catch包起來,此類中的函式介面不支援丟擲checked exception
- Flowable.create(emitter)
有時,資料來源被封裝成Flowable的時候,裡面的API不支援backpressure(如非同步網路請求)。Rxjava裡面提供了create(emitter)工廠方法,包含2個引數 - 一個callback,該回調將針對每個傳入的subscriber呼叫Emitter介面
- 一個BackpressureStrategy策略,同onBackpressureXXX類似
當前不支援非BackpressureStrategy之外的策略
Emitter使用起來很簡單,直接呼叫其onNext/onError/onComplete
具體的實現類是FlowableCreate ,看一個簡單的例子
Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull FlowableEmitter<String> emitter) throws Throwable {
System.out.println("Publisher subscribe = " + Thread.currentThread().getName());
emitter.onNext("Hello");
emitter.onNext("World");
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER);
具體的呼叫過程,在RxJava分析系列之RxJava實現原理分析part1 裡面的
2.觀察者模式的實現
資料支援的操作
建立方式
- https://github.com/ReactiveX/RxJava/blob/3.x/docs/Creating-Observables.md
- https://github.com/ReactiveX/RxJava/blob/3.x/docs/Phantom-Operators.md
- https://github.com/ReactiveX/RxJava/blob/3.x/docs/Subject.md
- https://github.com/ReactiveX/RxJava/blob/3.x/docs/String-Observables.md
支援的操作
- https://github.com/ReactiveX/RxJava/blob/3.x/docs/Conditional-and-Boolean-Operators.md
- https://github.com/ReactiveX/RxJava/blob/3.x/docs/Filtering-Observables.md
- https://github.com/ReactiveX/RxJava/blob/3.x/docs/Connectable-Observable-Operators.md
- https://github.com/ReactiveX/RxJava/blob/3.x/docs/Combining-Observables.md
- https://github.com/ReactiveX/RxJava/blob/3.x/docs/Mathematical-and-Aggregate-Operators.md
- https://github.com/ReactiveX/RxJava/blob/3.x/docs/Blocking-Observable-Operators.md
- https://github.com/ReactiveX/RxJava/blob/3.x/docs/Operator-Matrix.md
- https://github.com/ReactiveX/RxJava/blob/3.x/docs/Parallel-flows.md
- https://github.com/ReactiveX/RxJava/blob/3.x/docs/Async-Operators.md
- https://github.com/ReactiveX/RxJava/blob/3.x/docs/Observable-Utility-Operators.md
錯誤處理
- https://github.com/ReactiveX/RxJava/blob/3.x/docs/Error-Handling-Operators.md
- https://github.com/ReactiveX/RxJava/blob/3.x/docs/Error-Handling.md
自定義操作
- https://github.com/ReactiveX/RxJava/blob/3.x/docs/Implementing-Your-Own-Operators.md
- https://github.com/ReactiveX/RxJava/blob/3.x/docs/Plugins.md