
Can someone please explain how the takeUntilOther() method works? I tried to run the following code, but it shows nothing on my console.


        Flux.range(1, 5)

I don't understand why.


2 Answers



I'd suggest you referring to the appropriate part of the project reactor's documentation.

enter image description here

takeUntilOther(Publisher<?> other) Relay values from this Flux until the given Publisher emits.

Meaning, you will be receiving values from the original Flux until given Publisher<?> other starts producing events. In your case, you have a hot publisher just() that interrupts the original Flux immediately (by calling cancel() method).

I will give you one more example. Have a look at the following code snippet:

Flux.range(1, 5) // produces elements from 1 to 5
        .delayElements(Duration.ofSeconds(1)) // delays emission of each element from above for 1 second
                .just(10) // hot publisher. emits one element

                // delays '10' for 3 seconds. meaning that it will only 
                // appears in the original Flux in 3 seconds

The output is:



Add Thread.sleep to wait the main thread (or what ever thread current code is running) so that subscriber thread will continue process. Here is the unit test version.

    public void flux_Skip_Take_Based_On_Other_Streams() throws InterruptedException {
        Flux.range(1, 100) // publisher with elements from 1-100
                .delayElements(Duration.ofSeconds(1)) // Flux delay 1 sec before each element publish
                .skipUntilOther(Mono.just(10).delayElement(Duration.ofSeconds(10))) // skip the elements until inner Mono completes, i.e.  for 10 seconds
                .takeUntilOther(Mono.just(10).delayElement(Duration.ofSeconds(70))) // take the elements until inner mono completes, i.e. till 70 seconds
        Thread.sleep(1000*100); //Sleep the main thread for 100 sec or more to verify the logs


2022-05-27 17:45:33.317  INFO 4180 --- [           main] reactor.Flux.TakeUntilOther.1            : onSubscribe(SerializedSubscriber)
2022-05-27 17:45:33.317  INFO 4180 --- [           main] reactor.Flux.TakeUntilOther.1            : request(unbounded)
2022-05-27 17:45:43.469  INFO 4180 --- [    parallel-12] reactor.Flux.TakeUntilOther.1            : onNext(10)
2022-05-27 17:45:44.485  INFO 4180 --- [     parallel-1] reactor.Flux.TakeUntilOther.1            : onNext(11)
2022-05-27 17:46:42.098  INFO 4180 --- [    parallel-10] reactor.Flux.TakeUntilOther.1            : onNext(68)
2022-05-27 17:46:43.103  INFO 4180 --- [    parallel-11] reactor.Flux.TakeUntilOther.1            : onNext(69)
2022-05-27 17:46:43.306  INFO 4180 --- [     parallel-1] reactor.Flux.TakeUntilOther.1            : onComplete()