1. 程式人生 > >RxJava 學習筆記(九) --- Error Handling 錯誤處理操作

RxJava 學習筆記(九) --- Error Handling 錯誤處理操作

1. onErrorReturn —> 指示Observable在遇到錯誤時發射一個特定的資料

onErrorReturn方法返回一個映象原有Observable行為的新Observable,後者會忽略前者的onError呼叫,不會將錯誤傳遞給觀察者,作為替代,它會發發射一個特殊的項並呼叫觀察者的onCompleted方法。

            enter image description here

示例程式碼:

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
public void call(Subscriber<? super Integer> subscriber) { if (subscriber.isUnsubscribed()) return; //迴圈輸出數字 try { for (int i = 0; i < 10; i++) { if (i == 4) { throw
new Exception("this is number 4 error!"); } subscriber.onNext(i); } subscriber.onCompleted(); } catch (Exception e) { subscriber.onError(e); } } }); observable.onErrorReturn(new
Func1<Throwable, Integer>() { @Override public Integer call(Throwable throwable) { return 1004; } }).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); } @Override public void onError(Throwable e) { System.err.println("Error: " + e.getMessage()); } @Override public void onNext(Integer value) { System.out.println("Next:" + value); } });

輸出:

Next:0 
Next:1 
Next:2 
Next:3 
Next:1004 
Sequence complete.

2. onErrorResumeNext —> 指示Observable在遇到錯誤時發射一個數據序列

            enter image description here

onErrorResumeNext方法返回一個映象原有Observable行為的新Observable,後者會忽略前者的onError呼叫,不會將錯誤傳遞給觀察者,作為替代,它會開始映象另一個,備用的Observable

示例程式碼:

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                if (subscriber.isUnsubscribed()) return;
                //迴圈輸出數字
                try {
                    for (int i = 0; i < 10; i++) {
                        if (i == 4) {
                            throw new Exception("this is number 4 error!");
                        }
                        subscriber.onNext(i);
                    }
                    subscriber.onCompleted();
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            }
        });

        observable.onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() {
            @Override
            public Observable<? extends Integer> call(Throwable throwable) {
                return Observable.just(100,101, 102);
            }
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }

            @Override
            public void onError(Throwable e) {
                System.err.println("Error: " + e.getMessage());
            }

            @Override
            public void onNext(Integer value) {
                System.out.println("Next:" + value);
            }
        });

輸出:

Next:0 
Next:1 
Next:2 
Next:3 
Next:100 
Next:101 
Next:102 
Sequence complete.

3. onExceptionResumeNext —> 指示Observable遇到錯誤時繼續發射資料

onErrorResumeNext類似,onExceptionResumeNext方法返回一個映象原有Observable行為的新Observable,也使用一個備用的Observable,不同的是,如果onError收到的Throwable不是一個Exception,它會將錯誤傳遞給觀察者的onError方法,不會使用備用的Observable

            enter image description here

示例程式碼:

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                if (subscriber.isUnsubscribed()) return;
                //迴圈輸出數字
                try {
                    for (int i = 0; i < 10; i++) {
                        if (i == 4) {
                            throw new Exception("this is number 4 error!");
                        }
                        subscriber.onNext(i);
                    }
                    subscriber.onCompleted();
                } catch (Throwable e) {
                    subscriber.onError(e);
                }
            }
        });

        observable.onExceptionResumeNext(Observable.just(100, 101, 102)).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }

            @Override
            public void onError(Throwable e) {
                System.err.println("Error: " + e.getMessage());
            }

            @Override
            public void onNext(Integer value) {
                System.out.println("Next:" + value);
            }
        });

輸出:

Next:0 
Next:1 
Next:2 
Next:3 
Next:100 
Next:101 
Next:102 
Sequence complete.

4. retry —> 指示Observable遇到錯誤時重試

如果原始Observable遇到錯誤,重新訂閱它期望它能正常終止

            enter image description here

Retry操作符不會將原始ObservableonError通知傳遞給觀察者,它會訂閱這個Observable,再給它一次機會無錯誤地完成它的資料序列。Retry總是傳遞onNext通知給觀察者,由於重新訂閱,可能會造成資料項重複,如上圖所示。

RxJava中的實現為retryretryWhen

無論收到多少次onError通知,無引數版本的retry都會繼續訂閱併發射原始Observable

接受單個count引數的retry會最多重新訂閱指定的次數,如果次數超了,它不會嘗試再次訂閱,它會把最新的一個onError通知傳遞給它的觀察者。

還有一個版本的retry接受一個謂詞函式作為引數,這個函式的兩個引數是:重試次數和導致發射onError通知的Throwable。這個函式返回一個布林值,如果返回trueretry應該再次訂閱和映象原始的Observable,如果返回falseretry會將最新的一個onError通知傳遞給它的觀察者。

retry操作符預設在trampoline排程器上執行。

示例程式碼:retry(long)

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                if (subscriber.isUnsubscribed()) return;
                //迴圈輸出數字
                try {
                    for (int i = 0; i < 10; i++) {
                        if (i == 4) {
                            throw new Exception("this is number 4 error!");
                        }
                        subscriber.onNext(i);
                    }
                    subscriber.onCompleted();
                } catch (Throwable e) {
                    subscriber.onError(e);
                }
            }
        });

        observable.retry(2).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }

            @Override
            public void onError(Throwable e) {
                System.err.println("Error: " + e.getMessage());
            }

            @Override
            public void onNext(Integer value) {
                System.out.println("Next:" + value);
            }
        });

輸出:

Next:0 
Next:1 
Next:2 
Next:3

Next:0 
Next:1 
Next:2 
Next:3

Next:0 
Next:1 
Next:2 
Next:3 
Error: this is number 4 error

示例程式碼:retry(Func2)

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                if (subscriber.isUnsubscribed()) return;
                //迴圈輸出數字
                try {
                    for (int i = 0; i < 10; i++) {
                        if (i == 4) {
                            throw new Exception("this is number 4 error!");
                        }
                        subscriber.onNext(i);
                    }
                    subscriber.onCompleted();
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            }
        });

        observable.retry(new Func2<Integer, Throwable, Boolean>() {
            @Override
            public Boolean call(Integer integer, Throwable throwable) {
                return integer < 3;
            }
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }

            @Override
            public void onError(Throwable e) {
                System.err.println("Error: " + e.getMessage());
            }

            @Override
            public void onNext(Integer value) {
                System.out.println("Next--:" + value);
            }
        });

輸出:

Next--:0
Next--:1
Next--:2
Next--:3
Next--:0
Next--:1
Next--:2
Next--:3
Next--:0
Next--:1
Next--:2
Next--:3
Error: this is number 4 error!

5. retryWhen —> 指示Observable遇到錯誤時,將錯誤傳遞給另一個Observable來決定是否要重新給訂閱這個Observable

retryWhenretry類似,區別是,retryWhenonError中的Throwable傳遞給一個函式,這個函式產生另一個ObservableretryWhen觀察它的結果再決定是不是要重新訂閱原始的Observable。如果這個Observable發射了一項資料,它就重新訂閱,如果這個Observable發射的是onError通知,它就將這個通知傳遞給觀察者然後終止。

retryWhen預設在trampoline排程器上執行,你可以通過引數指定其它的排程器。

            enter image description here

示例程式碼:

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                System.out.println("subscribing");
                subscriber.onError(new RuntimeException("always fails"));
            }
        });

        observable.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
            @Override
            public Observable<?> call(Observable<? extends Throwable> observable) {
                return observable.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() {
                    @Override
                    public Integer call(Throwable throwable, Integer integer) {
                        return integer;
                    }
                }).flatMap(new Func1<Integer, Observable<?>>() {
                    @Override
                    public Observable<?> call(Integer integer) {
                        System.out.println("delay retry by " + integer + " second(s)");
                        //每一秒中執行一次
                        return Observable.timer(integer, TimeUnit.SECONDS);
                    }
                });
            }
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }

            @Override
            public void onError(Throwable e) {
                System.err.println("Error: " + e.getMessage());
            }

            @Override
            public void onNext(Integer value) {
                System.out.println("Next:" + value);
            }
        });

輸出:

subscribing 
delay retry by 1 second(s) 
subscribing 
delay retry by 2 second(s) 
subscribing 
delay retry by 3 second(s) 
subscribing 
Sequence complete.