0
votes

Given the following piece of code

public static void main(String[] args) {

    long start = System.currentTimeMillis();

    Flux.<Long>generate(s -> s.next(System.currentTimeMillis() - start))
            .flatMap(DemoApp::delayedAction)
            .doOnNext(l -> System.out.println(l + " -- " + (System.currentTimeMillis() - start)))
            .blockLast(Duration.ofSeconds(3));
}

private static Publisher<? extends Long> delayedAction(Long l) {
    return Mono.just(l).delayElement(Duration.ofSeconds(1));
}

One can see from the output that a large number of events are "processed" in delayedAction concurrently. In this example 256 events get generated within a few milliseconds and then wait about a second until they get emitted again.

I want to limit this number to e.g. 10, how can I do this?

The solution should be independent of what happens inside delayedAction

Background

What is really happening in delayed Action is: I do HTTP requests and starting an unlimited (or very large) amount of requests doesn't sound like a good idea.

1
Can you post your solution?Ashok Koyi
You mentioned this at the end of your question "I already found the solution. But thinking this might help others. Will post an answer." It feels like you have different answer than whats accepted. Can you please correct itAshok Koyi
The accepted answer is my answer. I put the part you quoted into the question so others don't start typing an answer and then get frustrated when upon saving it they notice I already wrote my own. I removed the quoted part since it seems to cause confusion now and it has fulfilled its purpose.Jens Schauder

1 Answers

1
votes

There is already a method for this: Flux.flatMap(Function> mapper, int concurrency)

From its documentation:

The concurrency argument allows to control how many Publisher can be subscribed to and merged in parallel.