1. 程式人生 > >給初學者的RxJava2.0教程(五)(轉)

給初學者的RxJava2.0教程(五)(轉)

roi 直接 ror 兩個 defined create 了解 作者 一點

前言

大家喜聞樂見的Backpressure來啦.

這一節中我們將來學習Backpressure. 我看好多吃瓜群眾早已坐不住了, 別急, 我們先來回顧一下上一節講的Zip.

正題

上一節中我們說到Zip可以將多個上遊發送的事件組合起來發送給下遊, 那大家有沒有想過一個問題, 如果其中一個水管A發送事件特別快, 而另一個水管B 發送事件特別慢, 那就可能出現這種情況, 發得快的水管A 已經發送了1000個事件了, 而發的慢的水管B 才發一個出來, 組合了一個之後水管A 還剩999個事件, 這些事件需要繼續等待水管B 發送事件出來組合, 那麽這麽多的事件是放在哪裏的呢? 總有一個地方保存吧? 沒錯, Zip給我們的每一根水管都弄了一個水缸

, 用來保存這些事件, 用通俗易懂的圖片來表示就是:

技術分享
zip2.png

如圖中所示, 其中藍色的框框就是zip給我們的水缸! 它將每根水管發出的事件保存起來, 等兩個水缸都有事件了之後就分別從水缸中取出一個事件來組合, 當其中一個水缸是空的時候就處於等待的狀態.

題外話: 大家來分析一下這個水缸有什麽特點呢? 它是按順序保存的, 先進來的事件先取出來, 這個特點是不是很熟悉呀? 沒錯, 這就是我們熟知的隊列, 這個水缸在Zip內部的實現就是用的隊列, 感興趣的可以翻看源碼查看.

好了回到正題上來, 這個水缸有大小限制嗎? 要是一直往裏存會怎樣? 我們來看個例子:

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {    
    @Override                                                                          
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {       
        for (int i = 0; ; i++) {   //無限循環發事件                                                    
            emitter.onNext(i);                                                         
        }                                                                              
    }                                                                                  
}).subscribeOn(Schedulers.io());    

Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {      
    @Override                                                                          
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {        
        emitter.onNext("A");                                                           
    }                                                                                  
}).subscribeOn(Schedulers.io());    

Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {                 
    @Override                                                                          
    public String apply(Integer integer, String s) throws Exception {                  
        return integer + s;                                                            
    }                                                                                  
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {                               
    @Override                                                                          
    public void accept(String s) throws Exception {                                    
        Log.d(TAG, s);                                                                 
    }                                                                                  
}, new Consumer<Throwable>() {                                                         
    @Override                                                                          
    public void accept(Throwable throwable) throws Exception {                         
        Log.w(TAG, throwable);                                                         
    }                                                                                  
});

在這個例子中, 我們分別創建了兩根水管, 第一根水管用機器指令的執行速度來無限循環發送事件, 第二根水管隨便發送點什麽, 由於我們沒有發送Complete事件, 因此第一根水管會一直發事件到它對應的水缸裏去, 我們來看看運行結果是什麽樣.

運行結果GIF圖:

技術分享
zip2.gif

我勒個草, 內存占用以斜率為1的直線迅速上漲, 幾秒鐘就300多M , 最終報出了OOM:

zlc.season.rxjava2demo W/art: Throwing OutOfMemoryError "Failed to allocate a 28 byte allocation with
4194304 free bytes and 8MB until OOM; 
zlc.season.rxjava2demo W/art: "main" prio=5 tid=1 Runnable      
zlc.season.rxjava2demo W/art:   | group="main" sCount=0 dsCount=0 obj=0x75188710 self=0x7fc0efe7ba00   
zlc.season.rxjava2demo W/art:   | sysTid=32686 nice=0 cgrp=default sched=0/0 handle=0x7fc0f37dc200    
zlc.season.rxjava2demo W/art:   | state=R schedstat=( 0 0 0 ) utm=948 stm=120 core=1 HZ=100         
zlc.season.rxjava2demo W/art:   | stack=0x7fff971e8000-0x7fff971ea000 stackSize=8MB         
zlc.season.rxjava2demo W/art:   | held mutexes= "mutator lock"(shared held)    
zlc.season.rxjava2demo W/art:     at java.lang.Integer.valueOf(Integer.java:742)

出現這種情況肯定是我們不想看見的, 這裏就可以引出我們的Backpressure了, 所謂的Backpressure其實就是為了控制流量, 水缸存儲的能力畢竟有限, 因此我們還得從源頭去解決問題, 既然你發那麽快, 數據量那麽大, 那我就想辦法不讓你發那麽快唄.

那麽這個源頭到底在哪裏, 究竟什麽時候會出現這種情況, 這裏只是說的Zip這一個例子, 其他的地方會出現嗎? 帶著這個問題我們來探究一下.

我們讓事情變得簡單一點, 從一個單一的Observable說起.

來看段代碼:

Observable.create(new ObservableOnSubscribe<Integer>() {                         
    @Override                                                                    
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { 
        for (int i = 0; ; i++) {   //無限循環發事件                                              
            emitter.onNext(i);                                                   
        }                                                                        
    }                                                                            
}).subscribe(new Consumer<Integer>() {                                           
    @Override                                                                    
    public void accept(Integer integer) throws Exception {                       
        Thread.sleep(2000);                                                      
        Log.d(TAG, "" + integer);                                                
    }                                                                            
});

這段代碼很簡單, 上遊同樣無限循環的發送事件, 在下遊每次接收事件前延時2秒. 上下遊工作在同一個線程裏, 來看下運行結果:

技術分享
peace.gif

哎臥槽, 怎麽如此平靜, 感覺像是走錯了片場.

為什麽呢, 因為上下遊工作在同一個線程呀騷年們! 這個時候上遊每次調用emitter.onNext(i)其實就相當於直接調用了Consumer中的:

   public void accept(Integer integer) throws Exception {                       
        Thread.sleep(2000);                                                      
        Log.d(TAG, "" + integer);                                                
   }

所以這個時候其實就是上遊每延時2秒發送一次. 最終的結果也說明了這一切.

那我們加個線程呢, 改成這樣:

Observable.create(new ObservableOnSubscribe<Integer>() {                            
    @Override                                                                       
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {    
        for (int i = 0; ; i++) {    //無限循環發事件                                                     
            emitter.onNext(i);                                                      
        }                                                                           
    }                                                                               
}).subscribeOn(Schedulers.io())                                                    
        .observeOn(AndroidSchedulers.mainThread())                                  
        .subscribe(new Consumer<Integer>() {                                        
            @Override                                                               
            public void accept(Integer integer) throws Exception {                  
                Thread.sleep(2000);                                                 
                Log.d(TAG, "" + integer);                                           
            }                                                                       
        });

這個時候把上遊切換到了IO線程中去, 下遊到主線程去接收, 來看看運行結果呢:

技術分享
violence.gif

可以看到, 給上遊加了個線程之後, 它就像脫韁的野馬一樣, 內存又爆掉了.

為什麽不加線程和加上線程區別這麽大呢, 這就涉及了同步異步的知識了.

當上下遊工作在同一個線程中時, 這時候是一個同步的訂閱關系, 也就是說上遊每發送一個事件必須等到下遊接收處理完了以後才能接著發送下一個事件.

當上下遊工作在不同的線程中時, 這時候是一個異步的訂閱關系, 這個時候上遊發送數據不需要等待下遊接收, 為什麽呢, 因為兩個線程並不能直接進行通信, 因此上遊發送的事件並不能直接到下遊裏去, 這個時候就需要一個田螺姑娘來幫助它們倆, 這個田螺姑娘就是我們剛才說的水缸 ! 上遊把事件發送到水缸裏去, 下遊從水缸裏取出事件來處理, 因此, 當上遊發事件的速度太快, 下遊取事件的速度太慢, 水缸就會迅速裝滿, 然後溢出來, 最後就OOM了.

這兩種情況用圖片來表示如下:

同步:

技術分享
同步.png

異步:

技術分享
異步.png

從圖中我們可以看出, 同步和異步的區別僅僅在於是否有水缸.

相信通過這個例子大家對線程之間的通信也有了比較清楚的認知和理解.

源頭找到了, 只要有水缸, 就會出現上下遊發送事件速度不平衡的情況, 因此當我們以後遇到BackPressure時, 仔細思考一下水缸在哪裏, 找到水缸, 你就找到了解決問題的辦法.

既然源頭找到了, 那麽下一節我們就要來學習如何去解決了. 下節見.



作者:Season_zlc
鏈接:http://www.jianshu.com/p/0f2d6c2387c9
來源:簡書
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請註明出處。

給初學者的RxJava2.0教程(五)(轉)