0
votes

Can someone please explain how the takeUntilOther() method works? I tried to run the following code, but it shows nothing on my console.

     Mono.just(10)
                .subscribe();

        Flux.range(1, 5)
                .takeUntilOther(Mono.just(10))
                .subscribe(System.out::println);

I don't understand why.

2

2 Answers

1
votes

Kirill,

I'd suggest you referring to the appropriate part of the project reactor's documentation.

enter image description here

takeUntilOther(Publisher<?> other) Relay values from this Flux until the given Publisher emits.

Meaning, you will be receiving values from the original Flux until given Publisher<?> other starts producing events. In your case, you have a hot publisher just() that interrupts the original Flux immediately (by calling cancel() method).

I will give you one more example. Have a look at the following code snippet:

Flux.range(1, 5) // produces elements from 1 to 5
        .delayElements(Duration.ofSeconds(1)) // delays emission of each element from above for 1 second
        .takeUntilOther(Mono
                .just(10) // hot publisher. emits one element

                // delays '10' for 3 seconds. meaning that it will only 
                // appears in the original Flux in 3 seconds
                .delayElement(Duration.ofSeconds(3)) 
        )
        .subscribe(System.out::print);

The output is:

12

0
votes

Add Thread.sleep to wait the main thread (or what ever thread current code is running) so that subscriber thread will continue process. Here is the unit test version.

    @Test
    public void flux_Skip_Take_Based_On_Other_Streams() throws InterruptedException {
        
        Flux.range(1, 100) // publisher with elements from 1-100
                .delayElements(Duration.ofSeconds(1)) // Flux delay 1 sec before each element publish
                .skipUntilOther(Mono.just(10).delayElement(Duration.ofSeconds(10))) // skip the elements until inner Mono completes, i.e.  for 10 seconds
                .takeUntilOther(Mono.just(10).delayElement(Duration.ofSeconds(70))) // take the elements until inner mono completes, i.e. till 70 seconds
                .log()
                .subscribe();
    
        Thread.sleep(1000*100); //Sleep the main thread for 100 sec or more to verify the logs
}

output:

2022-05-27 17:45:33.317  INFO 4180 --- [           main] reactor.Flux.TakeUntilOther.1            : onSubscribe(SerializedSubscriber)
2022-05-27 17:45:33.317  INFO 4180 --- [           main] reactor.Flux.TakeUntilOther.1            : request(unbounded)
2022-05-27 17:45:43.469  INFO 4180 --- [    parallel-12] reactor.Flux.TakeUntilOther.1            : onNext(10)
2022-05-27 17:45:44.485  INFO 4180 --- [     parallel-1] reactor.Flux.TakeUntilOther.1            : onNext(11)
.
.
.
2022-05-27 17:46:42.098  INFO 4180 --- [    parallel-10] reactor.Flux.TakeUntilOther.1            : onNext(68)
2022-05-27 17:46:43.103  INFO 4180 --- [    parallel-11] reactor.Flux.TakeUntilOther.1            : onNext(69)
2022-05-27 17:46:43.306  INFO 4180 --- [     parallel-1] reactor.Flux.TakeUntilOther.1            : onComplete()