0
votes

I ran this:

Mono<Void> mono = Mono.empty();

System.out.println("mono.block: " + mono.block());

and it produces:

mono.block: null    

as expected. In other words, calling block will return immediately if the Mono already completed.

Another example, resembling the real-world scenario. I have a source flux, e.g.:

Flux<Integer> ints = Flux.range(0, 2);

I make a connectable flux that I will use to allow multiple subscribers:

ConnectableFlux<Integer> publish = ints.publish();

For this example, let's say there's a single real-work subscriber:

publish
   .doOnComplete(() -> System.out.println("publish completed"))
   .subscribe();

and another subscriber that just produces the element count:

Mono<Long> countMono = publish
   .doOnComplete(() -> System.out.println("countMono completed"))
   .count();

countMono.subscribe();

I connect the connectable flux and print the element count:

publish.connect();

System.out.println("block");

long count = countMono.block();

System.out.println("count: " + count);

This prints:

publish completed
countMono completed
block

In other words, both subscribers subscribe successfully and complete, but then countMono.block() blocks indefinitely.

Why is that and how do I make this work? My end goal is to get the count of the elements.

1

1 Answers

1
votes

You can get this to work by using autoConnect or refCount instead of manually calling connect().

For example:

        Flux<Integer> ints = Flux.range(0, 2);
        Flux<Integer> publish = ints.publish()
                .autoConnect(2);  // new 
        publish
                .doOnComplete(() -> System.out.println("publish completed"))
                .subscribe();
        Mono<Long> countMono = publish
                .doOnComplete(() -> System.out.println("countMono completed"))
                .count();
        // countMono.subscribe();
        long count = countMono.block();
        System.out.println("count: " + count);

Why does your example not work?

Here is what I think is happening in your example... but this is based on my limited knowledge, and I'm not 100% sure it is correct.

  1. .publish() turns the upstream source into a hot stream
  2. You then subscribe twice (but these don't start the flow yet, since the connectable flux is not connected to the upstream yet)
  3. .connect() subscribes to the upstream, and starts the flow
  4. The upstream, and the two subscriptions that were registered before connect() complete (since this is all happening in the main thread)
  5. At this point the ConnectableFlux is no longer connected to the upstream, because the upstream has completed (The reactor docs are light on details on what happens to a ConnectableFlux when new subscriptions arrive after the upstream source completes, so this is what I'm not 100% certain about.)
  6. block() creates a new subscription.
  7. But since the ConnectableFlux is no longer connected, no data is flowing
  8. If you were to call connect() again (from another thread, since the main thread is blocked), data would flow again, and the block() would complete. However, this would be a new sequence (not the original sequence that completed in step 4)

Why does my example work?

Only two subscriptions are created (instead of 3 in your example), one from a .subscribe() call, and one from .block(). The ConnectableFlux auto connects after 2 subscriptions, and therefore the block() subscription completes. Both subscriptions share the same upstream sequence.