1. 程式人生 > >Rxjava2原始碼分析(一):Flowable的建立和基本使用過程分析

Rxjava2原始碼分析(一):Flowable的建立和基本使用過程分析

本文用於記錄一下自己學習Rxjava2原始碼的過程。
首先是最簡單的一個使用方法(未做執行緒切換),用來學習Flowable的建立和使用。
Flowable
        .create(new FlowableOnSubscribe<Object>() {
            @Override
public void subscribe(@NonNull FlowableEmitter<Object> e) throws Exception {

            }
        }, BackpressureStrategy.ERROR)
        .subscribe(new 
FlowableSubscriber<Object>() { @Override public void onSubscribe(@NonNull Subscription s) { } @Override public void onNext(Object o) { } @Override public void onError(Throwable t) { } @Override public void
onComplete() { } });
一、先看下create這個方法
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
    ObjectHelper.requireNonNull(source, "source is null");
ObjectHelper.requireNonNull(mode, "mode is null");
    return RxJavaPlugins.onAssembly
(new FlowableCreate<T>(source, mode)); }
1、首先看其引數,其引數為FlowableOnSubscribe和BackpressureStrategy
FlowableOnSubscribe為一個介面,裡面只有一個subscribe方法,該方法引數為FlowableEmitter
public interface FlowableOnSubscribe<T> {
void subscribe(@NonNull FlowableEmitter<T> e) throws Exception;
}
public interface FlowableEmitter<T> extends Emitter<T> {
void setDisposable(@Nullable Disposable s);
void setCancellable(@Nullable Cancellable c);
long requested();
boolean isCancelled();
FlowableEmitter<T> serialize();
}
FlowableEmitter是一個介面,繼承自Emitter介面
public interface Emitter<T> {
void onNext(@NonNull T value);
void onError(@NonNull Throwable error);
void onComplete();
}
Emitter接口裡面的方法是不是很熟悉,這3個方法就是對資料的回撥處理了。
因此create方法的第一個引數大概作用就可以知道了,建立一個匿名物件,這個物件處理資料供回撥操作。
BackpressureStrategy這是一個列舉作用後面再說。
2、再來看create裡面的程式碼,前兩句用來判斷引數是否為空,主要看最後一句
return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
先看onAssembly這個方法的引數,new了一個FlowableCreate物件,傳入了create的引數,看一下FlowableCreate這個類,繼承自Flwable,其構造方法儲存了FlowableOnSubscribe、BackpressureStrategy這兩個物件
public final class FlowableCreate<T> extends Flowable<T> {
    final FlowableOnSubscribe<T> source;    final BackpressureStrategy backpressure;    public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
        this.source = source;
        this.backpressure = backpressure;
}
}
接著進入onAssembly方法
public static <T> Flowable<T> onAssembly(@NonNull Flowable<T> source) {
    Function<? super Flowable, ? extends Flowable> f = onFlowableAssembly;
    if (f != null) {
        return apply(f, source);
}
    return source;
}
這裡先去獲取了Function物件f,但是這時候f為null,所以直接反回了source,也就是剛剛new的FlowableCreate物件,儲存了FlowableOnSubscribe、BackpressureStrategy這兩個物件,到此為止create結束。
二、接著看subscribe方法,先看引數,這裡new了個FlowableSubscriber物件
public interface FlowableSubscriber<T> extends Subscriber<T>
FlowableSubscriber為一個介面,繼承自Subscriber,Subscriber也是一個介面,裡面的方法是不是和Emitter裡的方法很像,不過現在他們還沒有關係,接著往下看
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
裡面的方法是不是和Emitter裡的方法很像,不過現在他們還沒有關係,接著往下看,進入subscribe方法裡面
public final void subscribe(FlowableSubscriber<? super T> s) {
    ObjectHelper.requireNonNull(s, "s is null");
    try {
        Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);
ObjectHelper.requireNonNull(z, "Plugin returned null Subscriber");
subscribeActual(z);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
        Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
        throw npe;
}
}
主要看try裡的程式碼
Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);
ObjectHelper.requireNonNull(z, "Plugin returned null Subscriber"); subscribeActual(z);
1、第一句呼叫了onSubscribe方法,進入方法裡面
public static <T> Subscriber<? super T> onSubscribe(@NonNull Flowable<T> source, @NonNull Subscriber<? super T> subscriber) {
    BiFunction<? super Flowable, ? super Subscriber, ? extends Subscriber> f = onFlowableSubscribe;
    if (f != null) {
        return apply(f, source, subscriber);
}
    return subscribe:;
}
該方法傳入兩個引數Flowable、Subscriber,呼叫該方法時傳入傳入的是this(即create創建出來的FlowableCreate物件),s(即subscribe方法new的FlowableSubscriber物件
該方法內部BiFunction物件f為null所以直接反回了FlowableSubscriber物件。
所以第一句程式碼主要作用就是把FlowableSubscriber物件賦值給z
2、第二句檢查z是否為null
3、這句程式碼呼叫了Flowable中的
protected abstract void subscribeActual(Subscriber<? super T> s);
這個方法,這是個抽象方法。但是由於create方法,此時的Flowable物件是他的子類FlowableCreate
進入FlowableCreate類裡面的subscribeActual方法
public void subscribeActual(Subscriber<? super T> t) {
    BaseEmitter<T> emitter;
    switch (backpressure) {
    case MISSING: {
        emitter = new MissingEmitter<T>(t);
        break;
}
    case ERROR: {
        emitter = new ErrorAsyncEmitter<T>(t);
        break;
}
    case DROP: {
        emitter = new DropAsyncEmitter<T>(t);
        break;
}
    case LATEST: {
        emitter = new LatestAsyncEmitter<T>(t);
        break;
}
    default: {
        emitter = new BufferAsyncEmitter<T>(t, bufferSize());
        break;
}
    }

    t.onSubscribe(emitter);
    try {
        source.subscribe(emitter);
} catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
emitter.onError(ex);
}
}
此時列舉物件BackpressureStrategy派上用場了,根據列舉型別建立不同子類的BaseEmitter物件,這裡以ErrorAsyncEmitter為例。
ErrorAsyncEmitter(Subscriber<? super T> actual) {
    super(actual);
}
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {

    private static final long serialVersionUID = 4127754106204442833L;
NoOverflowBaseAsyncEmitter(Subscriber<? super T> actual) {
        super(actual);
}

    @Override
public final void onNext(T t) {
        if (isCancelled()) {
            return;
}

        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
}

        if (get() != 0) {
            actual.onNext(t);
BackpressureHelper.produced(this, 1);
} else {
            onOverflow();
}
    }

    abstract void onOverflow();
}
這個類裡面有個onNext方法,把引數賦給了Subscriber的onNext方法,這裡就關聯上了onNext方法。
implements FlowableEmitter<T>, Subscription {
    private static final long serialVersionUID = 7326289992464377023L;
    final Subscriber<? super T> actual;
    final SequentialDisposable serial;
BaseEmitter(Subscriber<? super T> actual) {
        this.actual = actual;
        this.serial = new SequentialDisposable();
}

    @Override
public void onComplete() {
        if (isCancelled()) {
            return;
}
        try {
            actual.onComplete();
} finally {
            serial.dispose();
}
    }

    @Override
public void onError(Throwable e) {
        if (e == null) {
            e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
        if (isCancelled()) {
            RxJavaPlugins.onError(e);
            return;
}
        try {
            actual.onError(e);
} finally {
            serial.dispose();
}
    }

    @Override
public final void cancel() {
        serial.dispose();
onUnsubscribed();
}

    void onUnsubscribed() {
        // default is no-op
}

    @Override
public final boolean isCancelled() {
        return serial.isDisposed();
}

    @Override
public final void request(long n) {
        if (SubscriptionHelper.validate(n)) {
            BackpressureHelper.add(this, n);
onRequested();
}
    }

    void onRequested() {
        // default is no-op
}

    @Override
public final void setDisposable(Disposable s) {
        serial.update(s);
}

    @Override
public final void setCancellable(Cancellable c) {
        setDisposable(new CancellableDisposable(c));
}

    @Override
public final long requested() {
        return get();
}

    @Override
public final FlowableEmitter<T> serialize() {
        return new SerializedEmitter<T>(this);
}
}
最終到了BaseEmitter這個了,該類實現了FlowableEmitter這個介面
這個類儲存了FlowableSubscriber物件,同時可以看到onErroronComplete這兩個方法,此時EmitterSubscriber的方法關聯完畢。onErroronComplete這兩個方法是互斥的,所以呼叫一個後另一個就不執行,具體的過程這裡不分析。
繼續看這句程式碼onSubscribet.onSubscribe(emitter);emitter作為引數傳給了onSubscribe用於回撥
最後這句程式碼:
source.subscribe(emitter);
source為最開始create的引數,即我們new的FlowableOnSubscribe物件。這句程式碼把emitter作為引數傳給了FlowableOnSubscribe物件
整個過程完成了
總結起來步驟為:
1、create方法建立了FlowableCreate物件,這個物件儲存了一個FlowableOnSubscribe物件,FlowableOnSubscribe包含了一個subscribe(@NonNull FlowableEmitter<T> e)方法,該方法呼叫Emitter裡的onNext、onError、onComplete方法。
2、subscribe方法建立了一個Emitter物件,通過這個物件關聯了FlowableOnSubscribe物件和FlowableSubscriber物件,在FlowableOnSubscribe的subscribe方法中獲取資料後呼叫emitter的方法把資料傳遞給FlowableSubscriber,用於處理結果。