I have a function that return the input as a Mono:
public static Mono<Integer> emitter(int param){
return Mono.just(param)
.delayElement(Duration.ofMillis(100)); //delay to simulate http response
}
I'd like to call the emitter once with an initial value of 3 but then have it repeat until certain size is reached. This repeat logic should be in the main method so I can't modify the emitter()
.
public static void main(String[] args){
int maxSize = 5;
int initial = 3;
Mono<Integer> response = emitter(initial);
response
.doOnNext(s -> {
System.out.println("need more!");
})
.subscribe();
}
One naive solution would be:
public static void main(String[] args){
int maxSize = 5;
int initial = 3;
for(int i = 0; i < 999; i++) {
Mono<Integer> response = emitter(initial+i);
Mono<Boolean> isDone = response
.flatMap(elem -> {
if(elem < maxSize) {
System.out.println("need more!");
return Mono.just(false);
} else {
System.out.println("ok done!");
return Mono.just(true);
}
});
if(isDone.block())
break;
}
}
Basically, I'm trying to create another Mono with dynamic parameters based on the result of the previous Mono. I know that Mono/Flux are immutable...
Is there a neat and reactive way of doing this?
I've tried things like Flux.range(0, Integer.MAX_VALUE).zipWith(myMono)
to try and feed parameters into the emitter but couldn't make it work.
PS. I know my example doesn't make much sense. I've tried to simplify my real world scenario that involves lists and spring WebFlux(emitter).
Thanks!
---EDIT
Ok, here's what I came up with:
public static void main(String[] args) throws InterruptedException {
int maxSize = 5;
int initial = 3;
Flux.range(initial, 10)
.delayElements(Duration.ofSeconds(1))
.flatMap(param -> emitter(param))
.flatMap(it -> {
if(it < maxSize) {
System.out.println("need more!: " + it);
return Mono.just(false);
} else {
System.out.println("done!: " + it);
return Mono.just(true);
}
})
.takeUntil(Boolean::booleanValue)
.subscribe();
Thread.sleep(6000);
}
need more!: 3
need more!: 4
done!: 5
One problem is that if I don't delay Flux.range
, the execution is not done in order and it's possible for more or less print statements to be output than the 3 lines expected.
flatMap
+takeUntil
with a singletakeUntil
, right? – Simon Baslé