1. 程式人生 > >Rxjava初步從Rxjava1到Rxjava2--響應式程式設計

Rxjava初步從Rxjava1到Rxjava2--響應式程式設計

Rxjava概述

a library for composing asynchronous and event-based programs using observable sequences for the Java VM

翻譯一下大概是,一個在JVM上為了構建(非同步、基於事件、並使用了觀察者序列的)程式的庫。

響應式程式設計是一種面向資料流和變化傳播的程式設計正規化。

Eventbus也是基於觀察者模式的,Rxjava更加簡潔,目前基本可以用Rxjava代替。

主要介面

Rxjava1主要介面:

被觀察者:Observable

觀察者:Observer/Subscriber

中間處理者:Operator

Rxjava2主要介面:

被觀察者:Publisher

觀察者:Subscriber,Flowable

Rxjava1的簡單應用

被觀察者Observable:

Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        //傳遞資料流
        subscriber.onNext("***");
        //執行資料流處理
        subscriber.onCompleted();
    }
});

觀察者Observer或者Subscriber(多幾個回撥函式):

Subscriber<String> subscriber = new Subscriber<String>() {
    //事件處理完成回撥
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
    //接收資料流,事件處理
    @Override
    public void onNext(String s) {
    }
};

被觀察者訂閱觀察者:

/*
 * 被觀察者執行在io執行緒,效率高
 * 觀察者執行在安卓主執行緒
 */
observable.subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);

Rxjava2

官方已終止rxjava1的維護。

Rxjava2在Rxjava1的基礎上有了很多變化,我們按順序來了解Rxjava2的所有特性。

被觀察者:

Flowable 0-n個數據流 支援響應式流 支援背壓
Observable 0-n個數據流 不支援背壓
Single 1個數據流 一個物件或者error
Completable 1個數據流 僅一個成功或失敗標誌
Maybe 1個數據流 一個物件或者error

背壓是指在觀察者模式中,被觀察者傳送資料的速度比觀察者處理速度快很多的情況下,告知上游被觀察者放慢速度的策略。

組裝時間(Assembly time):

這是一個觀察者與被觀察者的中間過程,通過一些運算子來對資料流進行簡單處理。組裝有可能會和被觀察者在一個執行緒中非併發執行。

操作符:map,filter

排程器Schedulers:

Schedulers.io() 執行io或阻塞操作的執行緒

Schedulers.computation() 需要大量計算的執行緒

Schedulers.newThread() 常規執行緒

Schedulers.trampoline() 測試用常規執行緒

AndroidSchedulers.mainThread() Android主執行緒

操作符:

flatMap逐層依賴(concatMap,switchMap)

Flowable.range(1, 10)
  .flatMap(v ->
      Flowable.just(v)
        .subscribeOn(Schedulers.computation())
        .map(w -> w * w)
  )
  .blockingSubscribe(System.out::println);

concat(flowable,biflowable):實現被觀察者事件的順序執行。

zip:合併被觀察者,共同更新UI

訂閱:

訂閱方式和Rxjava1類似

Flowable.just("Hello world")
  .subscribe(new Consumer<String>() {
      @Override public void accept(String s) {
          System.out.println(s);
      }
  });

也可以直接在Subscription直接新增被觀察者

private CompositeSubscription subscription;
subscription.add(flowable);