0
votes

according to the retryWhen() doc:

Returns an Observable that emits the same values as the source observable with the exception of an onError. An onError notification from the source will result in the emission of a Throwable item to the Observable provided as an argument to the notificationHandler function. If that Observable calls onComplete or onError then retry will call onCompleted or onError on the child subscription. Otherwise, this Observable will resubscribe to the source Observable.

when the count1 value reached 3,subscriber.onCompleted() will be invoked;In my option,child subscriber's onComplete() method will be invoked after subscriber.onCompleted() is invoked and "child_onCompleted" will show up in the output,but it isn`t;Why is that happen?

  count1 = 0;  

      Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                System.out.println("call_onErr ");
                subscriber.onError(new Throwable("gg!"));
            }
        }).retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
            @Override
            public Observable<?> call(Observable<? extends Throwable> observable) {

                System.out.println("fun_call+" + observable);
                return observable.flatMap(new Func1<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> call(Throwable throwable) {

                        if (count1 < 3) {
                            count1 = count1 + 1;
                            return Observable.create(new Observable.OnSubscribe<Integer>() {
                                @Override
                                public void call(Subscriber<? super Integer> subscriber) {
                                    System.out.println("fun_call_onNext " + "  count=" + count1);
                                    subscriber.onNext(1000);
                                }
                            });
                        } else
                            return Observable.create(new Observable.OnSubscribe<Integer>() {
                                @Override
                                public void call(Subscriber<? super Integer> subscriber) {
                                    System.out.println("fun_call_onCompleted " + "   " + count1);
                                    subscriber.onCompleted();//this is the subscriber!!!!!!!
                                }
                            });
                    }
                });


            }
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                System.out.println("child_onCompleted ");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("child_err ");

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println("child_onNext " + integer + " ");
            }
        });

the output is:

System.out﹕ fun_call+rx.Observable@3ecb7134
System.out﹕ call_onErr
System.out﹕ fun_call_onNext   count=1
System.out﹕ call_onErr
System.out﹕ fun_call_onNext   count=2
System.out﹕ call_onErr
System.out﹕ fun_call_onNext   count=3
System.out﹕ call_onErr
System.out﹕ fun_call_onCompleted    3
1

1 Answers

0
votes

The problem is that you have flatMap which won't complete if you send it an empty Observable. Instead, you can use special values inside it and communicate with a takeWhile operator to trigger completion:

Observable.error(new Throwable("gg!"))
.retryWhen(o -> {
    AtomicInteger counter = new AtomicInteger();
    return o.flatMap(e -> {
        if (counter.getAndIncrement() < 3) {
            return Observable.just(0);
        }
        return Observable.just(1);
    })
    .takeWhile(v -> v == 0);
})
.subscribe(...);