4
votes

As per the definition of Mono and Flux, both represent an asynchronous sequence of data, and nothing happens before you subscribe.

And there are two broad families of publishers: hot and cold. Mono and Flux generate data anew for each subscription. If no subscription is created, then data never gets generated.

And Hot publishers, on the other hand, do not depend on any number of subscribers.

Here is my code for the cold stream:

        System.out.println("*********Calling coldStream************");
        Flux<String> source = Flux.fromIterable(Arrays.asList("ram", "sam", "dam", "lam"))
                .doOnNext(System.out::println)
                .filter(s -> s.startsWith("l"))
                .map(String::toUpperCase);

        source.subscribe(d -> System.out.println("Subscriber 1: "+d));
        source.subscribe(d -> System.out.println("Subscriber 2: "+d));
        System.out.println("-------------------------------------");

and here is the output:

*********Calling composeStream************
ram
sam
dam
lam
Subscriber 1: LAM
ram
sam
dam
lam
Subscriber 2: LAM
-------------------------------------

How can i convert the above cold stream into the hot stream?

2

2 Answers

5
votes

You can convert cold stream into hot stream by calling "publish" on the cold stream, it will create a ConnectableFlux. Since it is a hot stream nothing will happen until you call connect method on it, even if you subscribed. try out this example:

   Flux<String> source = Flux.fromIterable(Arrays.asList("ram", "sam", "dam", "lam"))
            .doOnNext(System.out::println)
            .filter(s -> s.startsWith("l"))
            .map(String::toUpperCase);

    ConnectableFlux<String> connectable = source.publish();
    connectable.subscribe(d -> System.out.println("Subscriber 1: "+d));
    connectable.subscribe(d -> System.out.println("Subscriber 2: "+d));
    connectable.connect();

The output is :

ram sam dam lam Subscriber 1: LAM Subscriber 2: LAM

Second example:

 Flux<String> source = Flux.fromIterable(Arrays.asList("ram", "sam", "dam", "lam"))
            .doOnNext(System.out::println)
            .filter(s -> s.startsWith("l"))
            .map(String::toUpperCase);

    ConnectableFlux<String> connectable = source.publish();
    connectable.subscribe(d -> System.out.println("Subscriber 1: "+d));
    connectable.connect();
    connectable.subscribe(d -> System.out.println("Subscriber 2: "+d));

The output is:

ram sam dam lam Subscriber 1: LAM

With these two example you can see that data start flowing from the moment wee call "connect" method

2
votes

Hot publishers do not depend on any number of subscribers. They might start publishing data right away and would continue doing so whenever a new Subscriber comes in (in which case said subscriber would only see new elements emitted after it subscribed). Most other hot publishers in Reactor extend Processor (UnicastProcessor in this case).

Here is how we can achieve this

    System.out.println("*********Calling hotStream************");
    UnicastProcessor<String> hotSource = UnicastProcessor.create();

    Flux<String> hotFlux = hotSource.publish()
                                    .autoConnect()
                                    .map(String::toUpperCase);


    hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d));

    hotSource.onNext("ram");
    hotSource.onNext("sam");

    hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d));

    hotSource.onNext("dam");
    hotSource.onNext("lam");
    hotSource.onComplete();
    System.out.println("-------------------------------------");

and here is the output for this:

*********Calling hotStream************
Subscriber 1 to Hot Source: RAM
Subscriber 1 to Hot Source: SAM
Subscriber 1 to Hot Source: DAM
Subscriber 2 to Hot Source: DAM
Subscriber 1 to Hot Source: LAM
Subscriber 2 to Hot Source: LAM
-------------------------------------

Subscriber 1 catches all four colors. Subscriber 2, having been created after the first two colors were produced, catches only the last two colors. The process described by the operators on this Flux runs regardless of when subscriptions have been attached.