給初學者的RxJava2.0教程(三)(轉)
前言
上一節教程講解了最基本的RxJava2的使用, 在本節中, 我們將學習RxJava強大的線程控制.
正題
還是以之前的例子, 兩根水管:
RxJava
正常情況下, 上遊和下遊是工作在同一個線程中的, 也就是說上遊在哪個線程發事件, 下遊就在哪個線程接收事件.
怎麽去理解呢, 以Android為例, 一個Activity的所有動作默認都是在主線程中運行的, 比如我們在onCreate中打出當前線程的名字:
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Log.d(TAG, Thread.currentThread().getName());
}
結果便是:
D/TAG: main
回到RxJava中, 當我們在主線程中去創建一個上遊Observable來發送事件, 則這個上遊默認就在主線程發送事件.
當我們在主線程去創建一個下遊Observer來接收事件, 則這個下遊默認就在主線程中接收事件, 來看段代碼:
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
Log.d(TAG, "emit 1");
emitter.onNext(1);
}
});
Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
Log.d(TAG, "onNext: " + integer);
}
};
observable.subscribe(consumer);
}
在主線程中分別創建上遊和下遊, 然後將他們連接在一起, 同時分別打印出它們所在的線程, 運行結果為:
D/TAG: Observable thread is : main
D/TAG: emit 1
D/TAG: Observer thread is :main
D/TAG: onNext: 1
這就驗證了剛才所說, 上下遊默認是在同一個線程工作.
這樣肯定是滿足不了我們的需求的, 我們更多想要的是這麽一種情況, 在子線程中做耗時的操作, 然後回到主線程中來操作UI, 用圖片來描述就是下面這個圖片:
thread.png
在這個圖中, 我們用黃色水管表示子線程, 深藍色水管表示主線程.
要達到這個目的, 我們需要先改變上遊發送事件的線程, 讓它去子線程中發送事件, 然後再改變下遊的線程, 讓它去主線程接收事件. 通過RxJava內置的線程調度器可以很輕松的做到這一點. 接下來看一段代碼:
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
Log.d(TAG, "emit 1");
emitter.onNext(1);
}
});
Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
Log.d(TAG, "onNext: " + integer);
}
};
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);
}
還是剛才的例子, 只不過我們太添加了一點東西, 先來看看運行結果:
D/TAG: Observable thread is : RxNewThreadScheduler-2
D/TAG: emit 1
D/TAG: Observer thread is :main
D/TAG: onNext: 1
可以看到, 上遊發送事件的線程的確改變了, 是在一個叫 RxNewThreadScheduler-2
的線程中發送的事件, 而下遊仍然在主線程中接收事件, 這說明我們的目的達成了, 接下來看看是如何做到的.
和上一段代碼相比,這段代碼只不過是增加了兩行代碼:
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
作為一個初學者的入門教程, 並不會貼出一大堆源碼來分析, 因此只需要讓大家記住幾個要點, 已達到如何正確的去使用這個目的才是我們的目標.
簡單的來說, subscribeOn()
指定的是上遊發送事件的線程, observeOn()
指定的是下遊接收事件的線程.
多次指定上遊的線程只有第一次指定的有效, 也就是說多次調用subscribeOn()
只有第一次的有效, 其余的會被忽略.
多次指定下遊的線程是可以的, 也就是說每調用一次observeOn()
, 下遊的線程就會切換一次.
舉個例子:
observable.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io())
.subscribe(consumer);
這段代碼中指定了兩次上遊發送事件的線程, 分別是newThread和IO線程, 下遊也指定了兩次線程,分別是main和IO線程. 運行結果為:
D/TAG: Observable thread is : RxNewThreadScheduler-3
D/TAG: emit 1
D/TAG: Observer thread is :RxCachedThreadScheduler-1
D/TAG: onNext: 1
可以看到, 上遊雖然指定了兩次線程, 但只有第一次指定的有效, 依然是在RxNewThreadScheduler
線程中, 而下遊則跑到了RxCachedThreadScheduler
中, 這個CacheThread其實就是IO線程池中的一個.
為了更清晰的看到下遊的線程切換過程, 我們加點log:
observable.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "After observeOn(mainThread), current thread is: " + Thread.currentThread().getName());
}
})
.observeOn(Schedulers.io())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "After observeOn(io), current thread is : " + Thread.currentThread().getName());
}
})
.subscribe(consumer);
我們在下遊線程切換之後, 把當前的線程打印出來, 運行結果:
D/TAG: Observable thread is : RxNewThreadScheduler-1
D/TAG: emit 1
D/TAG: After observeOn(mainThread), current thread is: main
D/TAG: After observeOn(io), current thread is : RxCachedThreadScheduler-2
D/TAG: Observer thread is :RxCachedThreadScheduler-2
D/TAG: onNext: 1
可以看到, 每調用一次observeOn()
線程便會切換一次, 因此如果我們有類似的需求時, 便可知道如何處理了.
在RxJava中, 已經內置了很多線程選項供我們選擇, 例如有
- Schedulers.io() 代表io操作的線程, 通常用於網絡,讀寫文件等io密集型的操作
- Schedulers.computation() 代表CPU計算密集型的操作, 例如需要大量計算的操作
- Schedulers.newThread() 代表一個常規的新線程
- AndroidSchedulers.mainThread() 代表Android的主線程
這些內置的Scheduler已經足夠滿足我們開發的需求, 因此我們應該使用內置的這些選項, 在RxJava內部使用的是線程池來維護這些線程, 所有效率也比較高.
實踐
對於我們Android開發人員來說, 經常會將一些耗時的操作放在後臺, 比如網絡請求或者讀寫文件,操作數據庫等等,等到操作完成之後回到主線程去更新UI, 有了上面的這些基礎, 那麽現在我們就可以輕松的去做到這樣一些操作.
下面來舉幾個常用的場景.
網絡請求
Android中有名的網絡請求庫就那麽幾個, Retrofit能夠從中脫穎而出很大原因就是因為它支持RxJava的方式來調用, 下面簡單講解一下它的基本用法.
要使用Retrofit,先添加Gradle配置:
//retrofit
compile ‘com.squareup.retrofit2:retrofit:2.1.0‘
//Gson converter
compile ‘com.squareup.retrofit2:converter-gson:2.1.0‘
//RxJava2 Adapter
compile ‘com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0‘
//okhttp
compile ‘com.squareup.okhttp3:okhttp:3.4.1‘
compile ‘com.squareup.okhttp3:logging-interceptor:3.4.1‘
隨後定義Api接口:
public interface Api {
@GET
Observable<LoginResponse> login(@Body LoginRequest request);
@GET
Observable<RegisterResponse> register(@Body RegisterRequest request);
}
接著創建一個Retrofit客戶端:
private static Retrofit create() {
OkHttpClient.Builder builder = new OkHttpClient().newBuilder();
builder.readTimeout(10, TimeUnit.SECONDS);
builder.connectTimeout(9, TimeUnit.SECONDS);
if (BuildConfig.DEBUG) {
HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
builder.addInterceptor(interceptor);
}
return new Retrofit.Builder().baseUrl(ENDPOINT)
.client(builder.build())
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
}
發起請求就很簡單了:
Api api = retrofit.create(Api.class);
api.login(request)
.subscribeOn(Schedulers.io()) //在IO線程進行網絡請求
.observeOn(AndroidSchedulers.mainThread()) //回到主線程去處理請求結果
.subscribe(new Observer<LoginResponse>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(LoginResponse value) {}
@Override
public void onError(Throwable e) {
Toast.makeText(mContext, "登錄失敗", Toast.LENGTH_SHORT).show();
}
@Override
public void onComplete() {
Toast.makeText(mContext, "登錄成功", Toast.LENGTH_SHORT).show();
}
});
看似很完美, 但我們忽略了一點, 如果在請求的過程中Activity已經退出了, 這個時候如果回到主線程去更新UI, 那麽APP肯定就崩潰了, 怎麽辦呢, 上一節我們說到了Disposable
, 說它是個開關, 調用它的dispose()
方法時就會切斷水管, 使得下遊收不到事件, 既然收不到事件, 那麽也就不會再去更新UI了. 因此我們可以在Activity中將這個Disposable
保存起來, 當Activity退出時, 切斷它即可.
那如果有多個Disposable
該怎麽辦呢, RxJava中已經內置了一個容器CompositeDisposable
, 每當我們得到一個Disposable
時就調用CompositeDisposable.add()
將它添加到容器中, 在退出的時候, 調用CompositeDisposable.clear()
即可切斷所有的水管.
讀寫數據庫
上面說了網絡請求的例子, 接下來再看看讀寫數據庫, 讀寫數據庫也算一個耗時的操作, 因此我們也最好放在IO線程裏去進行, 這個例子就比較簡單, 直接上代碼:
public Observable<List<Record>> readAllRecords() {
return Observable.create(new ObservableOnSubscribe<List<Record>>() {
@Override
public void subscribe(ObservableEmitter<List<Record>> emitter) throws Exception {
Cursor cursor = null;
try {
cursor = getReadableDatabase().rawQuery("select * from " + TABLE_NAME, new String[]{});
List<Record> result = new ArrayList<>();
while (cursor.moveToNext()) {
result.add(Db.Record.read(cursor));
}
emitter.onNext(result);
emitter.onComplete();
} finally {
if (cursor != null) {
cursor.close();
}
}
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
}
好了本次的教程就到這裏吧, 後面的教程將會教大家如何使用RxJava中強大的操作符. 通過使用這些操作符可以很輕松的做到各種吊炸天的效果. 敬請期待.
作者:Season_zlc
鏈接:http://www.jianshu.com/p/8818b98c44e2
來源:簡書
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請註明出處。
給初學者的RxJava2.0教程(三)(轉)