according to the retryWhen() doc:
Returns an Observable that emits the same values as the source observable with the exception of an onError. An onError notification from the source will result in the emission of a Throwable item to the Observable provided as an argument to the notificationHandler function. If that Observable calls onComplete or onError then retry will call onCompleted or onError on the child subscription. Otherwise, this Observable will resubscribe to the source Observable.
when the count1 value reached 3,subscriber.onCompleted() will be invoked;In my option,child subscriber's onComplete() method will be invoked after subscriber.onCompleted() is invoked and "child_onCompleted" will show up in the output,but it isn`t;Why is that happen?
count1 = 0;
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
System.out.println("call_onErr ");
subscriber.onError(new Throwable("gg!"));
}
}).retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
System.out.println("fun_call+" + observable);
return observable.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
if (count1 < 3) {
count1 = count1 + 1;
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
System.out.println("fun_call_onNext " + " count=" + count1);
subscriber.onNext(1000);
}
});
} else
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
System.out.println("fun_call_onCompleted " + " " + count1);
subscriber.onCompleted();//this is the subscriber!!!!!!!
}
});
}
});
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("child_onCompleted ");
}
@Override
public void onError(Throwable e) {
System.out.println("child_err ");
}
@Override
public void onNext(Integer integer) {
System.out.println("child_onNext " + integer + " ");
}
});
the output is:
System.out﹕ fun_call+rx.Observable@3ecb7134
System.out﹕ call_onErr
System.out﹕ fun_call_onNext count=1
System.out﹕ call_onErr
System.out﹕ fun_call_onNext count=2
System.out﹕ call_onErr
System.out﹕ fun_call_onNext count=3
System.out﹕ call_onErr
System.out﹕ fun_call_onCompleted 3