1. 程式人生 > >Android響應式程式設計(一)RxJava前篇[入門基礎]

Android響應式程式設計(一)RxJava前篇[入門基礎]

1.RxJava概述

ReactiveX與RxJava

在講到RxJava之前我們首先要了解什麼是ReactiveX,因為RxJava是ReactiveX的一種java實現。
ReactiveX是Reactive Extensions的縮寫,一般簡寫為Rx,微軟給的定義是,Rx是一個函式庫,讓開發者可以利用可觀察序列和LINQ風格查詢操作符來編寫非同步和基於事件的程式,開發者可以用Observables表示非同步資料流,用LINQ操作符查詢非同步資料流, 用Schedulers引數化非同步資料流的併發處理,Rx可以這樣定義:Rx = Observables + LINQ + Schedulers。

為何要用RxJava

想到非同步的操作我們會想到android的AsyncTask 和Handler,但是隨著請求的數量越來越多,程式碼邏輯將會變得越來越複雜而RxJava卻仍舊能保持清晰的邏輯。RxJava的原理就是建立一個Observable物件來幹活,然後使用各種操作符建立起來的鏈式操作,就如同流水線一樣把你想要處理的資料一步一步地加工成你想要的成品然後發射給Subscriber。

RxJava與觀察者模式

RxJava的非同步操作是通過擴充套件的觀察者模式來實現的,不瞭解觀察者模式的可以先看下 設計模式(五)觀察者模式這篇文章Rxjava有四個基本的要素:Observable (被觀察者)、 Observer (觀察者)、 subscribe (訂閱)、event(事件)。Observable (被觀察者) 和 Observer (觀察者)通過 subscribe() 方法實現訂閱關係,Observable就可以在需要的時候來通知Observer。

2.RxJava基本用法

在使用RxJava前請現在Android Studio 配置gradle:

dependencies {
    ...
    compile 'io.reactivex:rxjava:1.1.6'
    compile 'io.reactivex:rxandroid:1.2.1'
}

其中RxAndroid是RxJava的一部分,在普通的RxJava基礎上添加了幾個有用的類,比如特殊的排程器,後文會提到。

RxJava的基本用法分為三個步驟,他們分別是:

建立Observer(觀察者)

決定事件觸發的時候將有怎樣的行為

           Subscriber subscriber=new
Subscriber<String>() { @Override public void onCompleted() { Log.i("wangshu","onCompleted"); } @Override public void onError(Throwable e) { Log.i("wangshu","onError"); } @Override public void onNext(String s) { Log.i("wangshu","onNext"+s); } @Override public void onStart() { Log.i("wangshu","onStart"); } };

其中onCompleted、onError和onNext是必須要實現的方法,他們的含義分別是:

  • onCompleted:事件佇列完結,RxJava 不僅把每個事件單獨處理,還會把它們看做一個佇列。當不會再有新的 onNext發出時,需要觸發 onCompleted() 方法作為完成標誌。
  • onError:事件佇列異常,在事件處理過程中出異常時,onError() 會被觸發,同時佇列自動終止,不允許再有事件發出。
  • onNext:普通的事件,將要處理的事件新增到事件佇列中。
  • onStart:它會在事件還未傳送之前被呼叫,可以用於做一些準備工作。例如資料的清零或重置,這是一個可選方法,預設情況下它的實現為空。

當然如果要實現簡單的功能也可以用到Observer來建立觀察者,Observer是一個介面,而上面用到Subscriber是在Observer基礎上進行了擴充套件,在後文的Subscribe訂閱過程中Observer也會先被轉換為Subscriber來使用。

        Observer<String> observer = new Observer<String>() {
            @Override
            public void onCompleted() {
                Log.i("wangshu", "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i("wangshu", "onError");
            }

            @Override
            public void onNext(String s) {
                Log.i("wangshu", "onNext" + s);
            }
        };

建立 Observable(被觀察者)

它決定什麼時候觸發事件以及觸發怎樣的事件。 RxJava 使用 create() 方法來建立一個 Observable ,併為它定義事件觸發規則:

    Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("楊影楓");
                subscriber.onNext("月眉兒");
                subscriber.onCompleted();
            }
        });

通過呼叫subscriber的方法,不斷的將事件新增到任務佇列中,也可用just來實現:

  Observable observable = Observable.just("楊影楓", "月眉兒");

上述的程式碼會依次呼叫onNext(“楊影楓”)、onNext(“月眉兒”)、onCompleted()。

Subscribe (訂閱)

訂閱比較簡單:

 observable.subscribe(subscriber);

或者也可以呼叫

 observable.subscribe(observer);

執行程式碼檢視log:

com.example.liuwangshu.moonrxjava I/wangshu: onStart
com.example.liuwangshu.moonrxjava I/wangshu: onNext楊影楓
com.example.liuwangshu.moonrxjava I/wangshu: onNext月眉兒
com.example.liuwangshu.moonrxjava I/wangshu: onCompleted

3.不完整定義回撥

上文介紹了回撥的接收主要是依賴subscribe(Observer) 和 subscribe(Subscriber),除此之外RxJava還提供了另一種回撥方式,也就是不完整回撥。再講到不完整回撥之前我們首先要了解Action,檢視RxJava原始碼我們發現提供了一堆Action:
這裡寫圖片描述

我們開啟Action0來看看:

public interface Action0 extends Action {
    void call();
}

再開啟Action1:

public interface Action1<T> extends Action {
    void call(T t);
}

最後看看Action9:

public interface Action9<T1, T2, T3, T4, T5, T6, T7, T8, T9> extends Action {
    void call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9);
}

很明顯Action後的數字代表回撥的引數型別數量,上文訂閱也就可以改寫為下面的程式碼:

        Action1<String> onNextAction = new Action1<String>() {
            @Override
            public void call(String s) {
                Log.i("wangshu", "onNext" + s);
            }
        };
        Action1<Throwable> onErrorAction = new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {

            }
        };
        Action0 onCompletedAction = new Action0() {
            @Override
            public void call() {
                Log.d("wangshu", "onCompleted");
            }
        };
        observable.subscribe(onNextAction,onErrorAction,onCompletedAction);

我們定義了onNextAction來處理onNext的回撥,同理我們還定義了onErrorAction和onCompletedAction,最後我們把他傳給subscribe方法。很顯然這樣寫的靈活度很大一些,同時我們也可以只傳一個或者兩個Action:

  observable.subscribe(onNextAction);
  observable.subscribe(onNextAction,onErrorAction);

第一行只定義了onNextAction來處理onNext的回撥,而第二行則定義了onNextAction處理onNext的回撥,onErrorAction來處理onError的回撥。

4.Scheduler

內建的Scheduler

方才我們所做的都是執行在主執行緒的,如果我們不指定執行緒,預設是在呼叫subscribe方法的執行緒上進行回撥的,如果我們想切換執行緒就需要使用Scheduler。RxJava 已經內建了5個 Scheduler:

  • Schedulers.immediate():預設的,直接在當前執行緒執行,相當於不指定執行緒。
  • Schedulers.newThread():總是啟用新執行緒,並在新執行緒執行操作。
  • Schedulers.io():I/O 操作(讀寫檔案、讀寫資料庫、網路資訊互動等)所使用的 Scheduler。行為模式和 newThread() 差不多,區別在於 io() 的內部實現是是用一個無數量上限的執行緒池,可以重用空閒的執行緒,因此多數情況下 io() 比 newThread() 更有效率。
  • Schedulers.computation():計算所使用的 Scheduler,例如圖形的計算。這個 Scheduler 使用的固定的執行緒池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。
  • Schedulers.trampoline():當我們想在當前執行緒執行一個任務時,並不是立即時,可以用.trampoline()將它入隊。這個排程器將會處理它的佇列並且按序執行佇列中每一個任務。

另外RxAndroid也提供了一個常用的Scheduler:

  • AndroidSchedulers.mainThread():RxAndroid庫提供的Scheduler,它指定的操作在主執行緒中執行。

控制執行緒

subscribeOn() 和 observeOn() 兩個方法來對執行緒進行控制。
subscribeOn()方法指定 subscribe() 這個方法所在的執行緒,即事件產生的執行緒。observeOn()方法指定 Subscriber 回撥所執行在的執行緒,即事件消費的執行緒。

Action1<String> onNextAction = new Action1<String>() {
            @Override
            public void call(String s) {
                Log.i("wangshu", "onNext" + s);

            }
        };
Observable observable = Observable.just("楊影楓", "月眉兒"); 
            observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).subscribe(onNextAction);

我們仍舊是用log打印出onNext事件所傳遞過來的字串,只不過這一次事件的產生的執行緒是在io執行緒上,事件回撥的執行緒則是在主執行緒。

5.RxJava基礎應用

好了,講的不是很多,我們來舉一個例子來消化上面的知識。RxJava+Retrofit訪問網路是比較搭的,但是此前我的網路系列並沒有介紹Retrofit,所以我們先準備用RxJava+OKHttp來訪問網路,至於RxJava+Retrofit訪問網路會在此係列的以後的章節做介紹。OKHttp的用法請詳見Android網路程式設計(六)OkHttp3用法全解析這篇文章。
此前我們用OkHttp3訪問網路是這樣做的:

      private void postAsynHttp(int size) {
        mOkHttpClient=new OkHttpClient();
        RequestBody formBody = new FormBody.Builder()
                .add("size", size+"")
                .build();
        Request request = new Request.Builder()
                .url("http://api.1-blog.com/biz/bizserver/article/list.do")
                .post(formBody)
                .build();
        Call call = mOkHttpClient.newCall(request);
        call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {

            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
                String str = response.body().string();
                Log.i("wangshu", str);
                runOnUiThread(new Runnable() {
                    @Override
                    public void run() {
                        Toast.makeText(getApplicationContext(), "請求成功", Toast.LENGTH_SHORT).show();
                    }
                });
            }

        });
    }

接下來我們進行改造,首先我們建立Observable(被觀察者):

     private Observable<String> getObservable(final int size){
       Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
           @Override
           public void call(final Subscriber<? super String> subscriber) {
               mOkHttpClient=new OkHttpClient();
               RequestBody formBody = new FormBody.Builder()
                       .add("size",size+"")
                       .build();
               Request request = new Request.Builder()
                       .url("http://api.1-blog.com/biz/bizserver/article/list.do")
                       .post(formBody)
                       .build();
               Call call = mOkHttpClient.newCall(request);
               call.enqueue(new Callback() {
                   @Override
                   public void onFailure(Call call, IOException e) {
                       subscriber.onError(new Exception("error"));
                   }

                   @Override
                   public void onResponse(Call call, Response response) throws IOException {
                       String str = response.body().string();
                       subscriber.onNext(str);
                       subscriber.onCompleted();
                   }
               });
           }
       });
    return observable;
   }

我們將根據Okhttp的回撥(不在主執行緒)來定義事件的規則,呼叫subscriber.onNext來將請求返回的資料新增到事件佇列中。接下來我們來實現觀察者:

private void postAsynHttp(int size){   
getObservable(size).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<String>() {
           @Override
           public void onCompleted() {
               Log.i("wangshu", "onCompleted");
           }

           @Override
           public void onError(Throwable e) {
             Log.i("wangshu", e.getMessage());
           }

           @Override
           public void onNext(String s) {
               Log.i("wangshu", s);
               Toast.makeText(getApplicationContext(), "請求成功", Toast.LENGTH_SHORT).show();
           }
       });
   }

我們將訪問網路回撥設定為主執行緒,所以Toast是能正常顯示的。好了這一篇就講到這裡,關於RxJava的文章後期還會寫,敬請期待。