0
votes

I have a function that return the input as a Mono:

public static Mono<Integer> emitter(int param){
    return Mono.just(param)
            .delayElement(Duration.ofMillis(100)); //delay to simulate http response
}

I'd like to call the emitter once with an initial value of 3 but then have it repeat until certain size is reached. This repeat logic should be in the main method so I can't modify the emitter().

public static void main(String[] args){
    int maxSize = 5;
    int initial = 3;
    Mono<Integer> response = emitter(initial);

    response
        .doOnNext(s -> {
            System.out.println("need more!");
        })
        .subscribe();
}

One naive solution would be:

public static void main(String[] args){
    int maxSize = 5;
    int initial = 3;

    for(int i = 0; i < 999; i++) {
        Mono<Integer> response = emitter(initial+i);
        Mono<Boolean> isDone = response
                .flatMap(elem -> {
                    if(elem < maxSize) {
                        System.out.println("need more!");
                        return Mono.just(false);
                    } else {
                        System.out.println("ok done!");
                        return Mono.just(true);
                    }
                });
        if(isDone.block())
            break;
    }
}

Basically, I'm trying to create another Mono with dynamic parameters based on the result of the previous Mono. I know that Mono/Flux are immutable... Is there a neat and reactive way of doing this? I've tried things like Flux.range(0, Integer.MAX_VALUE).zipWith(myMono) to try and feed parameters into the emitter but couldn't make it work.

PS. I know my example doesn't make much sense. I've tried to simplify my real world scenario that involves lists and spring WebFlux(emitter).

Thanks!

---EDIT

Ok, here's what I came up with:

public static void main(String[] args) throws InterruptedException {
    int maxSize = 5;
    int initial = 3;

    Flux.range(initial, 10)
            .delayElements(Duration.ofSeconds(1))  
            .flatMap(param -> emitter(param))
            .flatMap(it -> {
                if(it < maxSize) {
                    System.out.println("need more!: " + it);
                    return Mono.just(false);
                } else {
                    System.out.println("done!: " + it);
                    return Mono.just(true);
                }
             })
            .takeUntil(Boolean::booleanValue)
            .subscribe();

    Thread.sleep(6000);
}
need more!: 3
need more!: 4
done!: 5

One problem is that if I don't delay Flux.range, the execution is not done in order and it's possible for more or less print statements to be output than the 3 lines expected.

1
you should be able to replace that second flatMap + takeUntil with a single takeUntil, right?Simon Baslé
Yep, works too!GbGbGbGbEB

1 Answers

4
votes

You can use the expand function of Publisher, which acts like recursion e.g.

emitter(initial)
    .expand(i -> i < maxSize ? emitter(i + 1) : Mono.empty())
    .doOnNext(i -> System.out.println("i = " + i))
    .subscribe();