Rxjava2原始碼分析(一):Flowable的建立和基本使用過程分析
阿新 • • 發佈:2018-12-29
本文用於記錄一下自己學習Rxjava2原始碼的過程。
首先是最簡單的一個使用方法(未做執行緒切換),用來學習Flowable的建立和使用。
Flowable .create(new FlowableOnSubscribe<Object>() { @Override public void subscribe(@NonNull FlowableEmitter<Object> e) throws Exception { } }, BackpressureStrategy.ERROR) .subscribe(newFlowableSubscriber<Object>() { @Override public void onSubscribe(@NonNull Subscription s) { } @Override public void onNext(Object o) { } @Override public void onError(Throwable t) { } @Override public voidonComplete() { } });
一、先看下create這個方法
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) { ObjectHelper.requireNonNull(source, "source is null"); ObjectHelper.requireNonNull(mode, "mode is null"); return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode)); }
1、首先看其引數,其引數為FlowableOnSubscribe和BackpressureStrategy
FlowableOnSubscribe為一個介面,裡面只有一個subscribe方法,該方法引數為FlowableEmitter
public interface FlowableOnSubscribe<T> { void subscribe(@NonNull FlowableEmitter<T> e) throws Exception; }
public interface FlowableEmitter<T> extends Emitter<T> { void setDisposable(@Nullable Disposable s); void setCancellable(@Nullable Cancellable c); long requested(); boolean isCancelled();
FlowableEmitter<T> serialize(); }
FlowableEmitter是一個介面,繼承自Emitter介面
public interface Emitter<T> {
void onNext(@NonNull T value);
void onError(@NonNull Throwable error);
void onComplete(); }
Emitter接口裡面的方法是不是很熟悉,這3個方法就是對資料的回撥處理了。
因此create方法的第一個引數大概作用就可以知道了,建立一個匿名物件,這個物件處理資料供回撥操作。
BackpressureStrategy這是一個列舉作用後面再說。
2、再來看create裡面的程式碼,前兩句用來判斷引數是否為空,主要看最後一句
return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
先看onAssembly這個方法的引數,new了一個FlowableCreate物件,傳入了create的引數,看一下FlowableCreate這個類,繼承自Flwable,其構造方法儲存了FlowableOnSubscribe、BackpressureStrategy這兩個物件
public final class FlowableCreate<T> extends Flowable<T> { final FlowableOnSubscribe<T> source; final BackpressureStrategy backpressure; public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) { this.source = source; this.backpressure = backpressure; }
}
接著進入onAssembly方法
public static <T> Flowable<T> onAssembly(@NonNull Flowable<T> source) { Function<? super Flowable, ? extends Flowable> f = onFlowableAssembly; if (f != null) { return apply(f, source); } return source; }
這裡先去獲取了Function物件f,但是這時候f為null,所以直接反回了source,也就是剛剛new的FlowableCreate物件,儲存了FlowableOnSubscribe、BackpressureStrategy這兩個物件,到此為止create結束。
二、接著看subscribe方法,先看引數,這裡new了個FlowableSubscriber物件
public interface FlowableSubscriber<T> extends Subscriber<T>
FlowableSubscriber為一個介面,繼承自Subscriber,Subscriber也是一個介面,裡面的方法是不是和Emitter裡的方法很像,不過現在他們還沒有關係,接著往下看
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete(); }
裡面的方法是不是和Emitter裡的方法很像,不過現在他們還沒有關係,接著往下看,進入subscribe方法裡面
public final void subscribe(FlowableSubscriber<? super T> s) { ObjectHelper.requireNonNull(s, "s is null"); try { Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s); ObjectHelper.requireNonNull(z, "Plugin returned null Subscriber"); subscribeActual(z); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
主要看try裡的程式碼
Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);
ObjectHelper.requireNonNull(z, "Plugin returned null Subscriber"); subscribeActual(z);
1、第一句呼叫了onSubscribe方法,進入方法裡面
public static <T> Subscriber<? super T> onSubscribe(@NonNull Flowable<T> source, @NonNull Subscriber<? super T> subscriber) { BiFunction<? super Flowable, ? super Subscriber, ? extends Subscriber> f = onFlowableSubscribe; if (f != null) { return apply(f, source, subscriber); } return subscribe:; }
該方法傳入兩個引數Flowable、Subscriber,呼叫該方法時傳入傳入的是this(即create創建出來的FlowableCreate物件),s(即subscribe方法new的FlowableSubscriber物件)
該方法內部BiFunction物件f為null所以直接反回了FlowableSubscriber物件。
所以第一句程式碼主要作用就是把FlowableSubscriber物件賦值給z
2、第二句檢查z是否為null
3、這句程式碼呼叫了Flowable中的
protected abstract void subscribeActual(Subscriber<? super T> s);
這個方法,這是個抽象方法。但是由於create方法,此時的Flowable物件是他的子類FlowableCreate
進入FlowableCreate類裡面的subscribeActual方法
public void subscribeActual(Subscriber<? super T> t) { BaseEmitter<T> emitter; switch (backpressure) { case MISSING: { emitter = new MissingEmitter<T>(t); break; } case ERROR: { emitter = new ErrorAsyncEmitter<T>(t); break; } case DROP: { emitter = new DropAsyncEmitter<T>(t); break; } case LATEST: { emitter = new LatestAsyncEmitter<T>(t); break; } default: { emitter = new BufferAsyncEmitter<T>(t, bufferSize()); break; } } t.onSubscribe(emitter); try { source.subscribe(emitter); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); emitter.onError(ex); } }
此時列舉物件BackpressureStrategy派上用場了,根據列舉型別建立不同子類的BaseEmitter物件,這裡以ErrorAsyncEmitter為例。
ErrorAsyncEmitter(Subscriber<? super T> actual) { super(actual); }
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
private static final long serialVersionUID = 4127754106204442833L;
NoOverflowBaseAsyncEmitter(Subscriber<? super T> actual) {
super(actual);
}
@Override
public final void onNext(T t) {
if (isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (get() != 0) {
actual.onNext(t);
BackpressureHelper.produced(this, 1);
} else {
onOverflow();
}
}
abstract void onOverflow();
}
這個類裡面有個onNext方法,把引數賦給了Subscriber的onNext方法,這裡就關聯上了onNext方法。
implements FlowableEmitter<T>, Subscription {
private static final long serialVersionUID = 7326289992464377023L;
final Subscriber<? super T> actual;
final SequentialDisposable serial;
BaseEmitter(Subscriber<? super T> actual) {
this.actual = actual;
this.serial = new SequentialDisposable();
}
@Override
public void onComplete() {
if (isCancelled()) {
return;
}
try {
actual.onComplete();
} finally {
serial.dispose();
}
}
@Override
public void onError(Throwable e) {
if (e == null) {
e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (isCancelled()) {
RxJavaPlugins.onError(e);
return;
}
try {
actual.onError(e);
} finally {
serial.dispose();
}
}
@Override
public final void cancel() {
serial.dispose();
onUnsubscribed();
}
void onUnsubscribed() {
// default is no-op
}
@Override
public final boolean isCancelled() {
return serial.isDisposed();
}
@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(this, n);
onRequested();
}
}
void onRequested() {
// default is no-op
}
@Override
public final void setDisposable(Disposable s) {
serial.update(s);
}
@Override
public final void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public final long requested() {
return get();
}
@Override
public final FlowableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}
}
最終到了BaseEmitter這個了,該類實現了FlowableEmitter這個介面
這個類儲存了FlowableSubscriber物件,同時可以看到onError、onComplete這兩個方法,此時Emitter和Subscriber的方法關聯完畢。onError、onComplete這兩個方法是互斥的,所以呼叫一個後另一個就不執行,具體的過程這裡不分析。
繼續看這句程式碼onSubscribet.onSubscribe(emitter);把emitter作為引數傳給了onSubscribe用於回撥
最後這句程式碼:
source.subscribe(emitter);
source為最開始create的引數,即我們new的FlowableOnSubscribe物件。這句程式碼把emitter作為引數傳給了FlowableOnSubscribe物件。
整個過程完成了
總結起來步驟為:
1、create方法建立了FlowableCreate物件,這個物件儲存了一個FlowableOnSubscribe物件,FlowableOnSubscribe包含了一個subscribe(@NonNull FlowableEmitter<T> e)方法,該方法呼叫Emitter裡的onNext、onError、onComplete方法。
2、subscribe方法建立了一個Emitter物件,通過這個物件關聯了FlowableOnSubscribe物件和FlowableSubscriber物件,在FlowableOnSubscribe的subscribe方法中獲取資料後呼叫emitter的方法把資料傳遞給FlowableSubscriber,用於處理結果。