0
votes

I want to combine multiple Flux of the same type together. At the time of subscription they should be executed in parallel. The subscribing method should be able to limit the number of requests.

I experimented with Flux.merge(..) and Flux.concat(..). The later seems to enforce a sequential request production even when publishing on a parallel scheduler when the first requests all elements in a eager way and only respects the take(n) Method for the onNext Events but not for the onRequest Events.

I prepared a minimal example. Three Flux which shall all execute heavy i/o work on a Schedulers.boundedElastic(). Each logging their onRequest Event.

Flux<String> f1 = Flux.just("1").publishOn(Schedulers.boundedElastic()).flatMap(i -> {
    try {
        //Long running, i/o task
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return Flux.just(i);
}).doOnRequest(s -> LOGGER.info("onRequest {}", 1));
Flux<String> f2 = Flux.just("2").publishOn(Schedulers.boundedElastic()).flatMap(i -> {
    try {
        //Long running, i/o task
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return Flux.just(i);
}).doOnRequest(s -> LOGGER.info("onRequest {}", 2));
Flux<String> f3 = Flux.just("3").publishOn(Schedulers.boundedElastic()).flatMap(i -> {
    try {
        //Long running, i/o task
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return Flux.just(i);
}).doOnRequest(s -> LOGGER.info("onRequest {}", 3));

Approach with with Flux.concat and take(2)

Flux.concat(f1, f2, f3).take(2)
        .doOnNext(i -> LOGGER.info("onNext {}", i))
        .subscribeOn(Schedulers.boundedElastic()).subscribe();

Result:

00:16:11.980   [oundedElastic-1] d.f.B.Application                        : onRequest 1
00:16:13.981   [oundedElastic-2] d.f.B.Application                        : onNext 1
00:16:13.981   [oundedElastic-2] d.f.B.Application                        : onRequest 2
00:16:15.982   [oundedElastic-3] d.f.B.Application                        : onNext 2

Two onRequest and two onNext Events are propagated. The requests are delayed by two seconds but where expected parallel.

Approach with Flux.map and take(2)

Flux.merge(f1, f2, f3).take(2)
        .doOnNext(i -> LOGGER.info("onNext {}", i))
        .subscribeOn(Schedulers.boundedElastic()).subscribe();

Result

00:21:58.301   [oundedElastic-1] d.f.B.Application                        : onRequest 1
00:21:58.302   [oundedElastic-1] d.f.B.Application                        : onRequest 2
00:21:58.302   [oundedElastic-1] d.f.B.Application                        : onRequest 3
00:22:00.303   [oundedElastic-4] d.f.B.Application                        : onNext 3
00:22:00.304   [oundedElastic-3] d.f.B.Application                        : onNext 2

Three onRequest events are emitted even though only two where expected. The take(n) method just limited the emitted onNext events. The execution was done parallel as expected.

Expected Result for unknown approach

00:21:58.302   [oundedElastic-1] d.f.B.Application                        : onRequest 2
00:21:58.302   [oundedElastic-1] d.f.B.Application                        : onRequest 3
00:22:00.303   [oundedElastic-4] d.f.B.Application                        : onNext 3
00:22:00.304   [oundedElastic-3] d.f.B.Application                        : onNext 2

Only two onRequest events are emitted and completed

Question

Is it possible to combine these three Flux in a way where they can be computed simultaneous but limited (take(n)?)in their request behaviour? In my real-world-scenario a request results in a WebClient calling a WebService, starting a time consuming i/o task. So even though my application is just recieving two onNext events, the third emitted onRequest results in a request that goes into nirvana.

1
What is the expected output?123
Also can you post the actual problem you are trying to solve. This looks like it could possibly be an X -> Y issue.123
I am experimenting with the reactive programming approach, so I want to learn it. I updated my question with an expected output.froehli
I can't think of a situation where this would be useful though, is there a real problem, or you just want to know if you can do this? Also in your expected result would you not expect 1,2 1,2 not 2,3 3,2123
I don't need to solve this problem to get a solution for my real problem since I could also create a Flux<String> for each API with their own limit and subscribe them one after another. But this is more a matter of my interest if I have a valid problem to solve or if I misunderstand some of the concepts in project reactor.froehli

1 Answers

0
votes

I am new to project-reactor as well, but I came up with this workaround that some might call a hack:

Flux.fromIterable(Lists.partition(Arrays.asList(f1, f2, f3), 2))
        .take(1)
        .flatMap(partition -> Flux.merge(partition)
                .doOnNext(i -> LOGGER.info("onNext {}", i)))
        .subscribeOn(Schedulers.boundedElastic())
        .subscribe();

It printed out this for me:

2020-04-17 01:07:37.658  INFO 59488 --- [oundedElastic-1] com.example.demo.SomeService1            : onRequest 1
2020-04-17 01:07:37.659  INFO 59488 --- [oundedElastic-1] com.example.demo.SomeService1            : onRequest 2
2020-04-17 01:07:39.664  INFO 59488 --- [oundedElastic-3] com.example.demo.SomeService1            : onNext 2
2020-04-17 01:07:39.667  INFO 59488 --- [oundedElastic-2] com.example.demo.SomeService1            : onNext 1