I want to combine multiple Flux
of the same type together. At the time of subscription they should be executed in parallel. The subscribing method should be able to limit the number of requests.
I experimented with Flux.merge(..)
and Flux.concat(..)
. The later seems to enforce a sequential request production even when publishing on a parallel scheduler when the first requests all elements in a eager way and only respects the take(n)
Method for the onNext
Events but not for the onRequest
Events.
I prepared a minimal example. Three Flux
which shall all execute heavy i/o work on a Schedulers.boundedElastic()
. Each logging their onRequest
Event.
Flux<String> f1 = Flux.just("1").publishOn(Schedulers.boundedElastic()).flatMap(i -> {
try {
//Long running, i/o task
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Flux.just(i);
}).doOnRequest(s -> LOGGER.info("onRequest {}", 1));
Flux<String> f2 = Flux.just("2").publishOn(Schedulers.boundedElastic()).flatMap(i -> {
try {
//Long running, i/o task
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Flux.just(i);
}).doOnRequest(s -> LOGGER.info("onRequest {}", 2));
Flux<String> f3 = Flux.just("3").publishOn(Schedulers.boundedElastic()).flatMap(i -> {
try {
//Long running, i/o task
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Flux.just(i);
}).doOnRequest(s -> LOGGER.info("onRequest {}", 3));
Approach with with Flux.concat
and take(2)
Flux.concat(f1, f2, f3).take(2)
.doOnNext(i -> LOGGER.info("onNext {}", i))
.subscribeOn(Schedulers.boundedElastic()).subscribe();
Result:
00:16:11.980 [oundedElastic-1] d.f.B.Application : onRequest 1
00:16:13.981 [oundedElastic-2] d.f.B.Application : onNext 1
00:16:13.981 [oundedElastic-2] d.f.B.Application : onRequest 2
00:16:15.982 [oundedElastic-3] d.f.B.Application : onNext 2
Two onRequest
and two onNext
Events are propagated. The requests are delayed by two seconds but where expected parallel.
Approach with Flux.map
and take(2)
Flux.merge(f1, f2, f3).take(2)
.doOnNext(i -> LOGGER.info("onNext {}", i))
.subscribeOn(Schedulers.boundedElastic()).subscribe();
Result
00:21:58.301 [oundedElastic-1] d.f.B.Application : onRequest 1
00:21:58.302 [oundedElastic-1] d.f.B.Application : onRequest 2
00:21:58.302 [oundedElastic-1] d.f.B.Application : onRequest 3
00:22:00.303 [oundedElastic-4] d.f.B.Application : onNext 3
00:22:00.304 [oundedElastic-3] d.f.B.Application : onNext 2
Three onRequest
events are emitted even though only two where expected. The take(n)
method just limited the emitted onNext
events. The execution was done parallel as expected.
Expected Result for unknown approach
00:21:58.302 [oundedElastic-1] d.f.B.Application : onRequest 2
00:21:58.302 [oundedElastic-1] d.f.B.Application : onRequest 3
00:22:00.303 [oundedElastic-4] d.f.B.Application : onNext 3
00:22:00.304 [oundedElastic-3] d.f.B.Application : onNext 2
Only two onRequest
events are emitted and completed
Question
Is it possible to combine these three Flux
in a way where they can be computed simultaneous but limited (take(n)
?)in their request behaviour? In my real-world-scenario a request results in a WebClient calling a WebService, starting a time consuming i/o task. So even though my application is just recieving two onNext events, the third emitted onRequest results in a request that goes into nirvana.
Flux<String>
for each API with their own limit and subscribe them one after another. But this is more a matter of my interest if I have a valid problem to solve or if I misunderstand some of the concepts in project reactor. – froehli