1. 程式人生 > >RxJava2知識回顧之RxJava基本操作符

RxJava2知識回顧之RxJava基本操作符

Map

操作符的作用是對上游傳送的每一個事件應用一個函式,使得每個事件按照函式的邏輯進行變換,通過Map就可以把上游傳送的每一個事件,轉換成Object或者集合.

 /**
     * map關鍵詞主要是將傳送事件通過Map轉換成另一種下游所需要的目標型別
     */
    public static void MapTest(){
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.e(TAG_OBSERVABLE,"被處理之前的值:1");
                emitter.onNext(1);
                Log.e(TAG_OBSERVABLE,"被處理之前的值:2");
                emitter.onNext(2);
                Log.e(TAG_OBSERVABLE,"被處理之前的值:3");
                emitter.onNext(3);
            }
        }).map(new Function<Integer, String>() {//在下游接收到事件之前處理,返回一個新的Observable
            @Override
            public String apply(Integer integer) throws Exception {
                return "我被轉換成:"+integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e(TAG_OBSERVAER,"接收的內容:"+s);
            }
        });
    }

結果:

map 基本作用就是將一個 Observable 通過某種函式關係,轉換為另一種 Observable,上面例子中就是把我們的 Integer 資料變成了 String 型別。從Log日誌顯而易見。

map函式呼叫完畢之後,將返回一個新的Observable,它的型別為ObservableMap.

使用場景:

網路請求返回結果,可以在map中進行資料解析,轉換成需要的資料格式傳送給下游,進行UI更新。

FlatMap

上游每傳送一個事件, flatMap都將建立一個新的水管, 然後傳送轉換之後的新的事件, 下游接收到的就是這些新的水管傳送的資料. 這裡需要注意的是, flatMap並不保證事件的順序。

concatMap與flatMap的作用幾乎一樣,只是concatMap結果嚴格按照上游傳送的順序。

Observable<Integer> integerObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.e(TAG_OBSERVABLE, "ObservableEmitter:" + Thread.currentThread().getName());
                Log.e(TAG_OBSERVABLE, "發射器1被處理之前的值:1");
                emitter.onNext(1);
                Log.e(TAG_OBSERVABLE, "發射器1被處理之前的值:2");
                emitter.onNext(2);
                Log.e(TAG_OBSERVABLE, "發射器1被處理之前的值:3");
                emitter.onNext(3);
                emitter.onComplete();
            }
        });

        integerObservable
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Integer integer) throws Exception {
                        Log.e(TAG_OBSERVABLE, "apply:" + Thread.currentThread().getName());
                        List<String> list = new ArrayList<>();
                        for (int i = 0; i < 2; i++) {
                            list.add("我的值:" + integer);
                        }
                        int delayTime = (int) (1 + Math.random() * 10);
                        return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
                    }
                }).subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e(TAG_OBSERVABLE, "accept:" + Thread.currentThread().getName());
                        Log.e(TAG_OBSERVAER, "接收到的:" + s);
                    }
                });

 結果:

使用場景:

可以用於類似註冊成功後返回的資料再呼叫登入介面獲取使用者資訊,多個介面連線轉換。

Zip

zip 專用於合併事件,該合併不是連線(連線操作符後面會說),而是兩兩配對,也就意味著,最終配對出的 Observable 發射事件數目只和少的那個相同。

/**
     * zip關鍵詞主要是將兩個事件合併成一個事件提供給下游處理
     */
    public static void zipTest(){
        Observable<Integer> integerObservable=Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.e(TAG_OBSERVABLE,"被處理之前的值:1");
                emitter.onNext(1);
                Log.e(TAG_OBSERVABLE,"被處理之前的值:2");
                emitter.onNext(2);
                Log.e(TAG_OBSERVABLE,"被處理之前的值:3");
                emitter.onNext(3);
            }
        });
        Observable<String> stringObservable=Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("我叫:");
                emitter.onNext("我女朋友叫:");
            }
        });

        Observable.zip(integerObservable, stringObservable, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String s) throws Exception {
                return s+integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e(TAG_OBSERVAER,"接收的內容:"+s);
            }
        });
    }

結果:

這種可以用於多個請求合併處理對應的結果.不管事件是怎樣的,A1總是跟B1結合,兩個事件中的順序都是按照對應的位置進行的。下游最終接收到的事件個數都是上游中事件數最少的那個被觀察者為準。

單一的把多個發射器連線成一個發射器,由於中間沒有其他轉換要求每個observable物件的傳送事件型別一致,接收到的結果按照註冊的優先順序傳送,不管是否在同一執行緒。

疑問:concat怎麼判斷該被觀察者已經發送完成了,測試發現不是以onComplete()結束為標識?

解答:其實在concat中會儲存每個observable物件需要傳送的事件個數,從而得知該被觀察者是否已經全部發送完畢。