1
votes

I'm trying to process a list of numbers, for example, 1 to 10, one by one using Reactor Flux, and there is an API /double which simply double the incoming Integer (1 -> 2, 4 -> 8...) ,however this API has performance issue, it always takes 2 seconds to response the result. When using limitRate(1) what I expected is Reactor processes requests one after another as following:

2020-01-01 00:00:02 - 2
2020-01-01 00:00:04 - 4
2020-01-01 00:00:06 - 6
2020-01-01 00:00:08 - 8
2020-01-01 00:00:10 - 10
...

But actually Reactor fires all requests at once:

2020-01-01 00:00:02 - 6
2020-01-01 00:00:02 - 10
2020-01-01 00:00:02 - 2
2020-01-01 00:00:02 - 4
2020-01-01 00:00:02 - 8
...

Here is the code

Flux.range(1, 10).limitRate(1)
                .flatMap(i -> webClient.get().uri("http://localhost:10001/double?integer={int}", i).exchange()
                        .flatMap(resp -> resp.bodyToMono(Integer.class)))
                .subscribe(System.out::println);
Thread.sleep(10000);

Seems limitRate is not working as I expected, what went wrong? Is there any way to process requests one after another using Reactor? Thanks in advance.

3

3 Answers

4
votes

.flatMap doesn't work here as it subscribes to the inner streams eagerly - that is, it won't wait for an inner stream to emit an onComplete before subscribing to the next stream. This is why all of your calls are made concurrently. It works in the receive->dispatch->receive->dispatch mode.

Reactor provides an overloaded version of flatMap where you can specify the concurrency factor as .flatMap(innerstream, concurrency). This factor caps the number of streams flatMap will subscribe to. If it is say 5, flatMap can subscribe to at most 5 inner streams. As soon as this limit is hit, it has to wait for an inner stream to emit onComplete before it subscribes to the next one.

In your case, you can either set it to 1 or use .concatMap(). concatMap() is exactly flatMap with concurrency = 1. It'll basically works in the receive->dispatch->wait->receive->dispatch->wait mode.

I wrote a post some time back explaining exactly how flatMap works, because I think a lot of people use it without understanding its internals. You can refer to the article here

3
votes

Consider to use a concatMap instead:

/**
 * Transform the elements emitted by this {@link Flux} asynchronously into Publishers,
 * then flatten these inner publishers into a single {@link Flux}, sequentially and
 * preserving order using concatenation.
 * <p>
 * There are three dimensions to this operator that can be compared with
 * {@link #flatMap(Function) flatMap} and {@link #flatMapSequential(Function) flatMapSequential}:
 * <ul>
 *     <li><b>Generation of inners and subscription</b>: this operator waits for one
 *     inner to complete before generating the next one and subscribing to it.</li>
 *     <li><b>Ordering of the flattened values</b>: this operator naturally preserves
 *     the same order as the source elements, concatenating the inners from each source
 *     element sequentially.</li>
 *     <li><b>Interleaving</b>: this operator does not let values from different inners
 *     interleave (concatenation).</li>
 * </ul>
 *
 * <p>
 * Errors will immediately short circuit current concat backlog.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/concatMap.svg" alt="">
 *
 * @reactor.discard This operator discards elements it internally queued for backpressure upon cancellation.
 *
 * @param mapper the function to transform this sequence of T into concatenated sequences of V
 * @param <V> the produced concatenated type
 *
 * @return a concatenated {@link Flux}
 */
public final <V> Flux<V> concatMap(Function<? super T, ? extends Publisher<? extends V>>
        mapper) {

Pay attention to the sequentially and preserving order using concatenation. phrase. Seems for me what you are looking for.

0
votes

Inspired by Artem Bilan's answer, I found flatMapSequential is a better for my case, since the flatMapSequential accepts second parameter as maxConcurrency, so that it is possible not to process messages one by one but twice a time and etc. Thanks Artem Bilan and Prashant Pandey for your answers, really helped.