1. 程式人生 > 其它 >RxJava實現原理分析part2-資料處理

RxJava實現原理分析part2-資料處理

技術標籤:開源框架分析rxjava3backpressure

先來一個Jack大神的演講視訊
Managing the Reactive World with RxJava

Backpressue

Backpressue是指一個Flowable管道處理中,有些非同步狀態無法及時處理產出的值,需要一種方式來告訴upstream降低生產資料的節奏,本質上是一種flow control策略

如下例子,source產出了過多的資料,會導致downstream處理不及時而導致問題

    PublishProcessor<Integer> source = PublishProcessor.create();

    source
    .observeOn(Schedulers.computation())
    .subscribe(v -> compute(v), Throwable::printStackTrace);

    for (int i = 0; i < 1_000_000; i++) {
        source.onNext(i);
    }

    Thread.sleep(10_000); 

使用如下寫法,則不會有任何問題

Flowable.range(1, 1_000_000)
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);

Thread.sleep(10_000); 

在Rxjava3裡面,提供了5種處理策略,定義在BackpressureStrategy裡面

  • MISSING onNext事件沒有任何buffer快取,或丟棄,downstream必須處理這些overflow.適用場景:
  • ERROR 傳送一個MissingBackpressureException以防止downstream無法繼續處理
  • BUFFER 快取所有onNext值到downstream消費,預設Buffer大小是128,可以修改
  • DROP 如果downstream無法處理,丟棄最近的onNext事件
  • LATEST 僅儲存最新的onNext值,如果downstream無法處理,則覆寫先前的值

具體實現的類圖結構如下:

onBackpressureXXX操作

大部分開發者遇到backpressure是當應用出問題丟擲MissingBackpressureException異常,而且異常通常指向於observeOn操作上。

實際的原因是使用了不支援backpressure的PublishProcessor、timer()、interval、或者通過create自定義的操作

有幾種方式可以處理此場景:

  • 增加buffer的大小

有時overflow是因為快速的事件產生。使用者突然快速點選螢幕,observeOn在android預設大小是16

在最新的Rxjava裡面,開發者可以顯式的指定快取大小,相關的引數

    Flowable.range(1, 1_000_000)
              .onBackpressureBuffer(1024)

然而在通常場景下,只設置一個臨時的大小也會出現overflow的情況,當源資料生產了超出預測快取大小時,在這種場景下,可以使用如下操作

  • onBackPressureBuffer系列方法

傳入的引數都不同,最終都是建立FlowableOnBackpressureBuffer

  • final int bufferSize 快取大小
  • final boolean unbounded 如果為true,表示快取大小被解釋為內部無界快取的island值
  • final boolean delayError 如果為true,表示一個Flowable裡面的exception會延遲傳送,直到快取裡面的所有元素都被downstream消費掉;為false時,會直接將Excetion通知到downstream,忽略快取的資料
  • final Action onOverflow 當一個事件需要被快取,但是沒有空間的時候,會執行此aciton

其中unbounded生效的場景為

if (unbounded) {
    q = new SpscLinkedArrayQueue<>(bufferSize);  // 可以調整大小
} else {
    q = new SpscArrayQueue<>(bufferSize);  // 大小固定
}
  • onBackPressureDrop

如果downstream還沒有準備好接收新的事件(通過Subscription.request呼叫的缺失來表示);從當前Flowable裡面丟棄.

如果downstream request為0,則由此產生的Flowable將不呼叫onNext,直到Subscriber再次呼叫request(n)來增加請求計數。

引數Consumer<? super T> onDrop 表示針對每個drop的事件,都會執行此action,應該快速且非阻塞

對應的實現策略在FlowableOnBackpressureDrop檔案裡面

  • onBackpressureLatest

如果downstream還沒有準備好接收新的事件(通過Subscription.request呼叫的缺失來表示);從當前Flowable裡面丟棄除最新之外的事件,當downstream準備好的時候,把最新的事件發給downstream

具體的實現是在FlowableOnBackpressureLatest類裡面

backpressured 資料來源的建立方式
  • **Flowable.just **

返回一個Flowable,然後把給定的常量引用事件傳送出去,然後結束。注意這個事件被獲取,然後重新發送,just不會改這個事件。使用fromCallable近需來生成單個事件

一個簡單的例子:

    Flowable.just(5).subscribe(new DisposableSubscriber<Integer>() {
        @Override
        public void onStart() {
            request(2);
        }

        @Override
        public void onNext(Integer v) {
            System.out.println(v);
        }
       
        @Override
        public void onError(Throwable t) {

        }
        
        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    }

輸出為如下,request裡面的數字沒有影響到onComplete的呼叫:

5
onComplete

具體實現類為FlowableJust

@Override
public void request(long n) {
    // n 要求大於等於0
    if (!SubscriptionHelper.validate(n)) {
        return;
    }
    if (compareAndSet(NO_REQUEST, REQUESTED)) {
        Subscriber<? super T> s = subscriber;
        // 把事件傳送給訂閱者
        s.onNext(value);
        if (get() != CANCELLED) {
            // 傳送完成事件
            s.onComplete();
        }
    }
}

當just方法裡面有多個引數的時候,內部實現是通過formarray

    public static <@NonNull T> Flowable<T> just(T item1, T item2) {
        Objects.requireNonNull(item1, "item1 is null");
        Objects.requireNonNull(item2, "item2 is null");

        return fromArray(item1, item2);
    }
  • Flowable.fromCallable

返回一個Flowable,當一個Subscriber訂閱的時候,會呼叫指定的Callable,然後返回Callable的執行結果並作為事件傳送出去

此方法可以指定直到一個Subscriber訂閱Publisher時,才執行特定的方法,換句話說,可以延遲載入

Flowable<Integer> o = Flowable.fromCallable(() -> computeValue());

具體實現類為FlowableFromCallable

public void subscribeActual(Subscriber<? super T> s) {
    DeferredScalarSubscription<T> deferred = new DeferredScalarSubscription<>(s);
    s.onSubscribe(deferred);

    T t;
    try {
        // 執行callable.call方法
        t = Objects.requireNonNull(callable.call(), "The callable returned a null value");
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        if (deferred.isCancelled()) {
            RxJavaPlugins.onError(ex);
        } else {
            s.onError(ex);
        }
        return;
    }
    // 將執行結果傳送出去
    deferred.complete(t);
}

public final void complete(T v) {
    int state = get();
    for (;;) {
        if (state == FUSED_EMPTY) {
            value = v;
            lazySet(FUSED_READY);

            Subscriber<? super T> a = downstream;
            // 把事件傳送給訂閱者
            a.onNext(v);
            if (get() != CANCELLED) {
                // 傳送完成事件
                a.onComplete();
            }
            return;
        }

        // if state is >= CANCELLED or bit zero is set (*_HAS_VALUE) case, return
        if ((state & ~HAS_REQUEST_NO_VALUE) != 0) {
            return;
        }

        if (state == HAS_REQUEST_NO_VALUE) {
            lazySet(HAS_REQUEST_HAS_VALUE);
            Subscriber<? super T> a = downstream;
            a.onNext(v);
            if (get() != CANCELLED) {
                a.onComplete();
            }
            return;
        }
        value = v;
        if (compareAndSet(NO_REQUEST_NO_VALUE, NO_REQUEST_HAS_VALUE)) {
            return;
        }
        state = get();
        if (state == CANCELLED) {
            value = null;
            return;
        }
    }
}
  • Flowable.fromArray

把一個數組轉換成一個Publisher,然後傳送數組裡面的每個元素,當陣列長度為1時,等效於just方法

    public static <@NonNull T> Flowable<T> fromArray(@NonNull T... items) {
        Objects.requireNonNull(items, "items is null");
        if (items.length == 0) {
            return empty();
        }
        if (items.length == 1) {
            return just(items[0]);
        }
        return RxJavaPlugins.onAssembly(new FlowableFromArray<>(items));
    }

一個簡單的例子

Flowable.fromArray(1, 2, 3, 4, 5).subscribe(System.out::println);

具體實現類為FlowableFromArray

public final void request(long n) {
    if (SubscriptionHelper.validate(n)) {
        if (BackpressureHelper.add(this, n) == 0L) {
            if (n == Long.MAX_VALUE) {
                fastPath();
            } else {
                slowPath(n);
            }
        }
    }
}

void slowPath(long r) {
    long e = 0;
    T[] arr = array;
    int f = arr.length;
    int i = index;
    Subscriber<? super T> a = downstream;

    for (;;) {

        while (e != r && i != f) {
            if (cancelled) {
                return;
            }

            T t = arr[i];

            if (t == null) {
                a.onError(new NullPointerException("The element at index " + i + " is null"));
                return;
            } else {
                // 迴圈傳送事件給downstream
                a.onNext(t);
            }

            e++;
            i++;
        }

        if (i == f) {
            if (!cancelled) {
                // 陣列事件都發送完後,
                a.onComplete();
            }
            return;
        }

        r = get();
        if (e == r) {
            index = i;
            r = addAndGet(-e);
            if (r == 0L) {
                return;
            }
            e = 0L;
        }
    }
}

fromIterable 實現fromarrsy類似,FlowableFromIterable可以用來實現狀態機

    Iterable<Integer> iterable = () -> new Iterator<Integer>() {
        @Override
        public boolean hasNext() {
            return true;
        }

        @Override
        public Integer next() {
            return 1;
        }
    };

    Flowable.fromIterable(iterable).take(5).subscribe(System.out::println);
  • Flowable.generate

有時,資料來源是同步的(blocking)和push的方式,被轉換成響應式時,我們呼叫get或read方法來獲取下資料來源中的下一塊。當然,可以使用Iterable,但是當源和某些資源關聯時,當downstream在資源關閉前停止訂閱的時, 可能會導致資源洩露

在RxJava裡面,處理這些場景,提供了generate工廠方法族

通常,generate使用3個callback

  • 第一個callback 使用Supplier為每個Subscriber建立一個初始化狀態
  • 第二個callback 獲取state物件,然後提供一個輸出Observer,onXXX方法可以被呼叫來發送事件。這個回撥會執行downstream的request,最多隻能呼叫onNext一次,可選後跟onError或onComplete
  • 第三個callable,處理downstream,即當前generator結束序列或被取消時,Consumer來處理後續的

方法的定義如下:

     * @param initialState the {@link Supplier} to generate the initial state for each {@code Subscriber}
     * @param generator the {@link Consumer} called with the current state whenever a particular downstream {@code Subscriber} has
     * requested a value. The callback then should call {@code onNext}, {@code onError} or
     * {@code onComplete} to signal a value or a terminal event. Signaling multiple {@code onNext}
     * in a call will make the operator signal {@link IllegalStateException}.
     * @param disposeState the {@code Consumer} that is called with the current state when the generator
     * terminates the sequence or it gets canceled

    public static <T, S> Flowable<T> generate(@NonNull Supplier<S> initialState, @NonNull BiConsumer<S, Emitter<T>> generator,
            @NonNull Consumer<? super S> disposeState) {
        Objects.requireNonNull(generator, "generator is null");
        return generate(initialState, FlowableInternalHelper.simpleBiGenerator(generator), disposeState);
    }

unsubscribe事件或前一個callback的結束事件,可以在這個裡面釋放資源

一個簡單的例子

     Flowable<Integer> o = Flowable.generate(
         () -> new FileInputStream("data.bin"),
         (inputstream, output) -> {
             try {
                 int abyte = inputstream.read();
                 if (abyte < 0) {
                     output.onComplete();
                 } else {
                     output.onNext(abyte);
                 }
             } catch (IOException ex) {
                 output.onError(ex);
             }
             return inputstream;
         },
         inputstream -> {
             try {
                 inputstream.close();
             } catch (IOException ex) {
                 RxJavaPlugins.onError(ex);
             }
         } 
    );

對應的實現類是FlowableGenerate

從JVM和lib庫裡面很多方法呼叫,會跑出checked exception,需要用try/catch包起來,此類中的函式介面不支援丟擲checked exception

  • Flowable.create(emitter)
    有時,資料來源被封裝成Flowable的時候,裡面的API不支援backpressure(如非同步網路請求)。Rxjava裡面提供了create(emitter)工廠方法,包含2個引數
  • 一個callback,該回調將針對每個傳入的subscriber呼叫Emitter介面
  • 一個BackpressureStrategy策略,同onBackpressureXXX類似

當前不支援非BackpressureStrategy之外的策略

Emitter使用起來很簡單,直接呼叫其onNext/onError/onComplete

具體的實現類是FlowableCreate ,看一個簡單的例子

        Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull FlowableEmitter<String> emitter) throws Throwable {
                System.out.println("Publisher subscribe = " + Thread.currentThread().getName());
                emitter.onNext("Hello");
                emitter.onNext("World");
                emitter.onComplete();
            }
        }, BackpressureStrategy.BUFFER);

具體的呼叫過程,在RxJava分析系列之RxJava實現原理分析part1 裡面的

2.觀察者模式的實現

資料支援的操作

建立方式

  • https://github.com/ReactiveX/RxJava/blob/3.x/docs/Creating-Observables.md
  • https://github.com/ReactiveX/RxJava/blob/3.x/docs/Phantom-Operators.md
  • https://github.com/ReactiveX/RxJava/blob/3.x/docs/Subject.md
  • https://github.com/ReactiveX/RxJava/blob/3.x/docs/String-Observables.md

支援的操作

  • https://github.com/ReactiveX/RxJava/blob/3.x/docs/Conditional-and-Boolean-Operators.md
  • https://github.com/ReactiveX/RxJava/blob/3.x/docs/Filtering-Observables.md
  • https://github.com/ReactiveX/RxJava/blob/3.x/docs/Connectable-Observable-Operators.md
  • https://github.com/ReactiveX/RxJava/blob/3.x/docs/Combining-Observables.md
  • https://github.com/ReactiveX/RxJava/blob/3.x/docs/Mathematical-and-Aggregate-Operators.md
  • https://github.com/ReactiveX/RxJava/blob/3.x/docs/Blocking-Observable-Operators.md
  • https://github.com/ReactiveX/RxJava/blob/3.x/docs/Operator-Matrix.md
  • https://github.com/ReactiveX/RxJava/blob/3.x/docs/Parallel-flows.md
  • https://github.com/ReactiveX/RxJava/blob/3.x/docs/Async-Operators.md
  • https://github.com/ReactiveX/RxJava/blob/3.x/docs/Observable-Utility-Operators.md

錯誤處理

  • https://github.com/ReactiveX/RxJava/blob/3.x/docs/Error-Handling-Operators.md
  • https://github.com/ReactiveX/RxJava/blob/3.x/docs/Error-Handling.md

自定義操作

  • https://github.com/ReactiveX/RxJava/blob/3.x/docs/Implementing-Your-Own-Operators.md
  • https://github.com/ReactiveX/RxJava/blob/3.x/docs/Plugins.md