RxJava 學習筆記(九) --- Error Handling 錯誤處理操作
1. onErrorReturn
—> 指示Observable在遇到錯誤時發射一個特定的資料
onErrorReturn
方法返回一個映象原有Observable
行為的新Observable
,後者會忽略前者的onError
呼叫,不會將錯誤傳遞給觀察者,作為替代,它會發發射一個特殊的項並呼叫觀察者的onCompleted
方法。
示例程式碼:
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if (subscriber.isUnsubscribed()) return;
//迴圈輸出數字
try {
for (int i = 0; i < 10; i++) {
if (i == 4) {
throw new Exception("this is number 4 error!");
}
subscriber.onNext(i);
}
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
});
observable.onErrorReturn(new Func1<Throwable, Integer>() {
@Override
public Integer call(Throwable throwable) {
return 1004;
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Integer value) {
System.out.println("Next:" + value);
}
});
輸出:
Next:0
Next:1
Next:2
Next:3
Next:1004
Sequence complete.
2. onErrorResumeNext
—> 指示Observable在遇到錯誤時發射一個數據序列
onErrorResumeNext
方法返回一個映象原有Observable
行為的新Observable
,後者會忽略前者的onError
呼叫,不會將錯誤傳遞給觀察者,作為替代,它會開始映象另一個,備用的Observable
。
示例程式碼:
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if (subscriber.isUnsubscribed()) return;
//迴圈輸出數字
try {
for (int i = 0; i < 10; i++) {
if (i == 4) {
throw new Exception("this is number 4 error!");
}
subscriber.onNext(i);
}
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
});
observable.onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() {
@Override
public Observable<? extends Integer> call(Throwable throwable) {
return Observable.just(100,101, 102);
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Integer value) {
System.out.println("Next:" + value);
}
});
輸出:
Next:0
Next:1
Next:2
Next:3
Next:100
Next:101
Next:102
Sequence complete.
3. onExceptionResumeNext
—> 指示Observable遇到錯誤時繼續發射資料
和onErrorResumeNext
類似,onExceptionResumeNext
方法返回一個映象原有Observable
行為的新Observable
,也使用一個備用的Observable
,不同的是,如果onError
收到的Throwable
不是一個Exception
,它會將錯誤傳遞給觀察者的onError
方法,不會使用備用的Observable
。
示例程式碼:
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if (subscriber.isUnsubscribed()) return;
//迴圈輸出數字
try {
for (int i = 0; i < 10; i++) {
if (i == 4) {
throw new Exception("this is number 4 error!");
}
subscriber.onNext(i);
}
subscriber.onCompleted();
} catch (Throwable e) {
subscriber.onError(e);
}
}
});
observable.onExceptionResumeNext(Observable.just(100, 101, 102)).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Integer value) {
System.out.println("Next:" + value);
}
});
輸出:
Next:0
Next:1
Next:2
Next:3
Next:100
Next:101
Next:102
Sequence complete.
4. retry
—> 指示Observable遇到錯誤時重試
如果原始Observable
遇到錯誤,重新訂閱它期望它能正常終止
Retry
操作符不會將原始Observable
的onError
通知傳遞給觀察者,它會訂閱這個Observable
,再給它一次機會無錯誤地完成它的資料序列。Retry
總是傳遞onNext
通知給觀察者,由於重新訂閱,可能會造成資料項重複,如上圖所示。
RxJava
中的實現為retry
和retryWhen
。
無論收到多少次onError
通知,無引數版本的retry
都會繼續訂閱併發射原始Observable
。
接受單個count
引數的retry
會最多重新訂閱指定的次數,如果次數超了,它不會嘗試再次訂閱,它會把最新的一個onError
通知傳遞給它的觀察者。
還有一個版本的retry
接受一個謂詞函式作為引數,這個函式的兩個引數是:重試次數和導致發射onError
通知的Throwable
。這個函式返回一個布林值,如果返回true
,retry
應該再次訂閱和映象原始的Observable
,如果返回false
,retry
會將最新的一個onError
通知傳遞給它的觀察者。
retry
操作符預設在trampoline
排程器上執行。
示例程式碼:retry(long)
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if (subscriber.isUnsubscribed()) return;
//迴圈輸出數字
try {
for (int i = 0; i < 10; i++) {
if (i == 4) {
throw new Exception("this is number 4 error!");
}
subscriber.onNext(i);
}
subscriber.onCompleted();
} catch (Throwable e) {
subscriber.onError(e);
}
}
});
observable.retry(2).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Integer value) {
System.out.println("Next:" + value);
}
});
輸出:
Next:0
Next:1
Next:2
Next:3
Next:0
Next:1
Next:2
Next:3
Next:0
Next:1
Next:2
Next:3
Error: this is number 4 error!
示例程式碼:retry(Func2)
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if (subscriber.isUnsubscribed()) return;
//迴圈輸出數字
try {
for (int i = 0; i < 10; i++) {
if (i == 4) {
throw new Exception("this is number 4 error!");
}
subscriber.onNext(i);
}
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
});
observable.retry(new Func2<Integer, Throwable, Boolean>() {
@Override
public Boolean call(Integer integer, Throwable throwable) {
return integer < 3;
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Integer value) {
System.out.println("Next--:" + value);
}
});
輸出:
Next--:0
Next--:1
Next--:2
Next--:3
Next--:0
Next--:1
Next--:2
Next--:3
Next--:0
Next--:1
Next--:2
Next--:3
Error: this is number 4 error!
5. retryWhen
—> 指示Observable遇到錯誤時,將錯誤傳遞給另一個Observable來決定是否要重新給訂閱這個Observable
retryWhen
和retry
類似,區別是,retryWhen
將onError
中的Throwable
傳遞給一個函式,這個函式產生另一個Observable
,retryWhen
觀察它的結果再決定是不是要重新訂閱原始的Observable
。如果這個Observable
發射了一項資料,它就重新訂閱,如果這個Observable
發射的是onError
通知,它就將這個通知傳遞給觀察者然後終止。
retryWhen
預設在trampoline
排程器上執行,你可以通過引數指定其它的排程器。
示例程式碼:
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
System.out.println("subscribing");
subscriber.onError(new RuntimeException("always fails"));
}
});
observable.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() {
@Override
public Integer call(Throwable throwable, Integer integer) {
return integer;
}
}).flatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(Integer integer) {
System.out.println("delay retry by " + integer + " second(s)");
//每一秒中執行一次
return Observable.timer(integer, TimeUnit.SECONDS);
}
});
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Integer value) {
System.out.println("Next:" + value);
}
});
輸出:
subscribing
delay retry by 1 second(s)
subscribing
delay retry by 2 second(s)
subscribing
delay retry by 3 second(s)
subscribing
Sequence complete.