6
votes

I have created a parallet flux from iterable. And on each iterable I have to make a rest call. But while executing even if any of the request fails , all the remaining requests also fail. I want all the requests to be executed irrespective of failure or success.

I am currently using Flux.fromIterable and using runOn operator

Flux.fromIterable(actions)
.parallel()
.runOn(Schedulars.elastic())
.flatMap(request -> someRemoteCall)     
.sequential()
.subscribe();

I want all the requests in iterable to be executed , irrespective of the failure or success. But as of now some gets executed and some gets failed.

4
Have added sequential just as part of hit and trial.naval jain
This is what I see in logs. {"date":"2019-08-17T08:41:48.043+00:00","loglevel":"ERROR","logger_name":"reactor.core.publisher.Operators","thread_name":"reactor-http-client-epoll-11","message":"Operator called default onErrorDropped","stack_trace":"org.springframework.web.reactive.function.client.WebClientResponseException: ClientResponse has erroneous status code: 404 Not Found\n\tatnaval jain

4 Answers

4
votes

There's three possible ways I generally use to achieve this:

  • Use the 3 argument version of flatMap(), the second of which is a mapperOnError -eg. .flatMap(request -> someRemoteCall(), x->Mono.empty(), null);
  • Use onErrorResume(x -> Mono.empty()) as a separate call to ignore any error;
  • Use .onErrorResume(MyException.class, x -> Mono.empty())) to just ignore errors of a certain type.

The second is what I tend to use by default, as I find that clearest.

2
votes

Because of .parallel().runOn(...) usage you can't use onErrorContinue as below:

.parallel()
.runOn(...)
.flatMap(request -> someRemoteCall)
.onErrorContinue(...)

but you might be able to use it like this:

.parallel().runOn(...)
.flatMap(request -> someRemoteCall
        .onErrorContinue((t, o) -> log.error("Skipped error: {}", t.getMessage()))
)

provided that someRemoteCall is a Mono or Flux not itself run on .parallel().runOn(...) rails.

But when you don't have a someRemoteCall you can do the trick below (see NOT_MONO_AND_NOT_FLUX) to ignore the unsafe processing run on .parallel().runOn(...) rails:

Optional<List<String>> foundImageNames = 
    Flux.fromStream(this.fileStoreService.walk(path))
        .parallel(cpus, cpus)
        .runOn(Schedulers.newBoundedElastic(cpus, Integer.MAX_VALUE, "import"), 1)

        .flatMap(NOT_MONO_AND_NOT_FLUX -> Mono
            .just(NOT_MONO_AND_NOT_FLUX)
            .map(path -> sneak(() -> unsafeLocalSimpleProcessingReturningString(path)))
            .onErrorContinue(FileNotFoundException.class,
                (t, o) -> log.error("File missing:\n{}", t.getMessage()))
        )

        .collectSortedList(Comparator.naturalOrder())
        .blockOptional();
0
votes

I'm still in the process of learning WebFlux and Reactor, but try one of the onErrorContinue directly after flatMap (REST call) to drop (and potentially log) errors.

0
votes

There are delay error operators in Reactor. You could write your code as follows:

Flux.fromIterable(actions)
    .flatMapDelayError(request -> someRemoteCall(request).subscribeOn(Schedulers.elastic()), 256, 32)
    .doOnNext(System.out::println)
    .subscribe();

Note that this will still fail your flux in case of any inside publisher emits error, however, it will wait for all inner publishers to finish before doing that.

These operators also require to specify the concurrency and prefetch parameters. In the example I've set them to their default values which is used in regular flatMap calls.