6
votes

I have a PublishSubject and a Subscriber which I use to process a (possibly) infinite stream of preprocessed data. The problem is that some of the elements might contain some error. I'd like to ignore them and continue processing. How can I do so? I've tried something like this:

    val subject = PublishSubject.create<String>()
    subject.retry().subscribe({
        println("next: $it")
    }, {
        println("error")
    }, {
        println("complete")
    })

    subject.onNext("foo")
    subject.onNext("bar")
    subject.onError(RuntimeException())
    subject.onNext("wom")
    subject.onComplete()

My problem is that none of the error handling methods help me out here:

onErrorResumeNext() — instructs an Observable to emit a sequence of items if it encounters an error

onErrorReturn( ) — instructs an Observable to emit a particular item when it encounters an error

onExceptionResumeNext( ) — instructs an Observable to continue emitting items after it encounters an exception (but not another variety of throwable)

retry( ) — if a source Observable emits an error, resubscribe to it in the hopes that it will complete without error

retryWhen( ) — if a source Observable emits an error, pass that error to another Observable to determine whether to resubscribe to the source

I tried retry() for example but it hangs my process after the error indefinitely.

I also tried onErrorResumeNext() but it does not work as expected:

    val backupSubject = PublishSubject.create<String>()
    val subject = PublishSubject.create<String>()
    var currentSubject = subject
    subject.onErrorResumeNext(backupSubject).subscribe({
        println("next: $it")
    }, {
        println("error")
        currentSubject = backupSubject
    }, {
        println("complete")
    })

    backupSubject.subscribe({
        println("backup")
    }, {
        println("backup error")
    })

    currentSubject.onNext("foo")
    currentSubject.onNext("bar")
    currentSubject.onError(RuntimeException())
    currentSubject.onNext("wom")
    currentSubject.onComplete()

This only prints foo and bar.

1
Have you tried onErrorResumeNext? The documentation as you've shown is a little weird, but that sure sounds like it does what you describe wanting it do do... I think the disconnect is that the API assumes, if an Observable has failed, that it may be unable to advance to the next element; so it's written to let you provide a new Observable that continues the sequence; so you just need one that picks up where the error caused you to leave off, don't you? - Mark Adelsberger
onErrorResumeNext has a parameter and if I call it with the same subject I still can't see the remaining next values and the onComplete. If i add a new Subject it obviously won't have the subscriber I added to the previous one. - Adam Arold
Ok, to you it's "obvious" it wouldn't have the subscriber, and to me if the error-handling function doesn't take care of that, then what exactly does the error-handling function do? It is both named and documented as the solution to your problem, so again have you tried it? - Mark Adelsberger
Yes I tried but it does not work. retry() just hangs the process. - Adam Arold
I didn't ask if you tried retry(). I asked if you tried passing a new observable to onErrorResumeNext(). But clearly you don't want help if it contradicts your assumptions, so good luck. - Mark Adelsberger

1 Answers

12
votes

If you want to continue processing after an error, it means your error is a value just like your Strings and should go through onNext. To ensure type safety in this case, you should use some form of wrapper that can either take a regular value or an error; for example, the io.reactivex.Notification<T> is available in RxJava 2:

PublishSubject<Notification<String>> subject = PublishSubject.create();

subject.subscribe(System.out::println);

subject.onNext(Notification.createOnNext("Hello"));
subject.onNext(Notification.<String>createOnError(new RuntimeException("oops")));
subject.onNext(Notification.createOnNext("World"));