1
votes

Today at the office, my colleagues and I were discussing the importance of making sure that computations be done declaratively inside the reactive stream and not eagerly, and imperatively at the call of a method.

For the sake of the explanation let me give a very naive, but illustrative example. We said the following would be wrong:

Single<Integer> getStream(String s) {
   Integer n = Integer.valueOf(s);
   return Single.just(n);
}

Evidently, the user of the method would do:

getStream("A").returnOnError(error -> 0)
              .subscribe(System.out::println, System.err::println);

But this code will eagerly fail in getStream("A"), and my stream error handler would never be reached. The user of a reactive stream method is not expecting to do a try-catch around its invocation since the stream offers other mechanisms to deal with the errors, right?

So, we suggested we should do either of:

Single.defer(() -> Single.just(Integer.valueOf(s));
Single.fromCallable(() -> Integer.valueOf(s))
Sigle.create(subscriber -> { subscriber.onNext(Integer.valueOf(s)); }

But then, in the next code review I found something like this:

Single<Foo> getReactiveFoo(Bar bar, Baz baz, int price) {
     return Single.fromCallable(() -> {
           Objects.requireNonNull(bar, "bar should not be null");
           Objects.requireNonNull(baz, "baz should not be null");
           Preconditions.checkArgument(price > 0, "price should be > 0");
           //...
     });
}

So, now I found this rather interesting, because I have always considered that this type of validations for NullPointerException, IllegalArgumentException, IndexOutOfBoundException, etc, have the intent to capture programmer errors, and I usually like these to fail as close as possible to where the error originates. When you throw any of these, you most certainly want the program to immediately fail and signal that there's a bug that needs to be handled and I wouldn't mind for this to fail eagerly and as close as possible where the bug was introduced.

Therefore, even in spite of our initial discussion, I had the feeling these programmer error validations belonged outside the stream, and in the eager method invocation, i.e.

Single<Foo> getReactiveFoo(Bar bar, Baz baz, int price) {
     Objects.requireNonNull(bar, "bar should not be null");
     Objects.requireNonNull(baz, "baz should not be null");
     Preconditions.checkArgument(price > 0, "price should be > 0");
     return Single.fromCallable(() -> {
           //...
     });
}

I would hope this is not an opinionated question. I would like to understand if the reactive streams community has an opinion on how exactly we should deal with this type of exceptions and whether they belong in the declarative reactive stream or the imperative method invocation.

Perhaps there are good examples or reference material on the topic that you could suggest that I should read.

1
Check out what RxJava is doing in terms of input validation: github.com/ReactiveX/RxJava/blob/2.x/src/main/java/io/reactivex/… - akarnokd
"So, now I found this rather interesting, because I have always considered that this type of validations for NullPointerException, IllegalArgumentException, IndexOutOfBoundException, etc, have the intent to capture programmer errors, and I usually like these to fail as close as possible to where the error originates. When you throw any of these, you most certainly want the program to immediately fail and signal that there's a bug that needs to be handled and I wouldn't mind for this to fail eagerly and as close as possible where the bug was introduced." Agree. - Zsolt Safrany

1 Answers

2
votes

I think it is sane to think of such validation as preconditions and thus failing them can be considered programming mistakes. These should fail fast and fail hard, so the approach of throwing is ok here in my opinion. That is what both RxJava and Reactor do.