I have a flux that emits some Date
. This Date
is mapped to 1024 simulated HTTP requests that I'm running on some Executer
.
What I'd like to do is waiting for all the 1024 HTTP requests before emitting the next Date
.
Currently when running, onNext()
is called for many times and then it is stabilised on some steady rate.
How can I change this behaviour?
P.S. I'm willing to change to architecture, if needed.
private void run() throws Exception {
Executor executor = Executors.newFixedThreadPool(2);
Flux<Date> source = Flux.generate(emitter ->
emitter.next(new Date())
);
source
.log()
.limitRate(1)
.doOnNext(date -> System.out.println("on next: " + date))
.map(date -> Flux.range(0, 1024))
.flatMap(i -> Mono.fromCallable(Pipeline::simulateHttp)
.subscribeOn(Schedulers.fromExecutor(executor)))
.subscribe(s -> System.out.println(s));
Thread.currentThread().join();
}
HTTP request simulation:
private static String simulateHttp() {
try {
System.out.println("start http call");
Thread.sleep(3_000);
} catch (Exception e) {}
return "HTML content";
}
EDIT: adapted code from answer:
- First, I had a bug in my code (another
flatMap
was needed) Second, I added
concurrency
parameter of1
to bothflatMap
(it seems that both are needed)Executor executor = Executors.newSingleThreadExecutor(); Flux<Date> source = Flux.generate(emitter -> { System.out.println("emitter called!"); emitter.next(new Date()); }); source .limitRate(1) .map(date -> Flux.range(0, 16)) .flatMap(Function.identity(), 1) # concurrency = 1 .flatMap(i -> Mono.fromCallable(Pipeline::simulateHttp) .subscribeOn(Schedulers.fromExecutor(executor)), 1) # concurrency = 1 .subscribe(s -> System.out.println(s)); Thread.currentThread().join();