1
votes

We have a multi-component application which offers a Reactive Streams API between the components. Some components are implemented using Akka Streams, others are using e.g. Reactor.

In one component we noticed that Streams did not process any messages, although the provided Publisher offers records. I nailed down the issue to the following situation:

Publisher<String> stringPublisher = Source
    .from(Lists.newArrayList("Hello", "World", "!"))
    .runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);

Source<String, NotUsed> allMessages = Source
    .fromPublisher(stringPublisher)
    .toMat(BroadcastHub.of(String.class, 256), Keep.right())
    .run(materializer);

allMessages
    .runForeach(System.out::println, materializer)
    .toCompletableFuture()
    .get();

One component provides a Publisher (it needs to be Publisher as the API uses Reactive Streams API, not Akka Streams API). This Publisher is created from another Akka Streams Source and turned into a Publisher using Sink.asPublisher.

When we now materialize the Stream starting from the Publisher with a BroadcastHub no record is procesed at all.

I tried the same with a Reactor Publisher:

Publisher<String> stringPublisher = Flux.fromIterable(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);

This works as expected. Unfortunately I cannot exclude the situation that another component creates its Publisher from an Akka Stream Source.

Does anyone have an idea what goes wrong?

1
I further investigated. The thing that it worked with Flux seems to be luck. Because when I repeat the experiment a few hundred times it does not always work, just more often then with the other Akka Streams Producer. Looks like I got something wrong with Broadcast Hub.cokeSchlumpf

1 Answers

1
votes

I know now how to solve it, it works if I start consuming the resultunt Source of BroadcastHub within a mapMaterializedValue:

Publisher<String> stringPublisher = Source
    .from(Lists.newArrayList("Hello", "World", "!"))
    .runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);

Source
    .fromPublisher(stringPublisher)
    .alsoToMat(BroadcastHub.of(String.class, 256), Keep.right())
    .mapMaterializedValue(source -> source
         .runWith(Sink.foreach(System.out::println, materializer))
    .run(materializer)
    .toCompletableFuture()
    .get();

Edit: TL;DR: The explaination is stated in the Lightbend Forum:

What happens here is that the main stream is already completed when you attach the other stream. Sometimes it might be quick enough to see a few elements before completion.

---

Thus it looks like the BroadcastHub actually drops the elements before a consumer is attached to the Source created by BroadcastHub.

The documentation says it does not drop:

If there are no subscribers attached to this hub then it will not drop any elements but instead backpressure the upstream producer until subscribers arrive.

https://doc.akka.io/docs/akka/current/stream/stream-dynamic.html

And actually this is true for most cases, but I found some cases where it does not behave correctly:

public void testBH3() throws ExecutionException, InterruptedException {
    Publisher<String> stringPublisher = Source
        .from(Lists.newArrayList("Hello", "World", "!"))
        .runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);

    Source<String, NotUsed> allMessages = Source
        .fromPublisher(stringPublisher)
        .toMat(BroadcastHub.of(String.class, 256), Keep.right())
        .run(materializer);

    allMessages
        .runForeach(System.out::println, materializer)
        .toCompletableFuture()
        .get();
}

public void repeat() throws ExecutionException, InterruptedException {
    for (int i = 0; i < 100; i++) {
        testBH3();
        System.out.println("------");
    }
}

This works in ~3 of the 100 cases. But the following works in all cases (I just added a throttle to produce elements slower):

public void testBH3() throws ExecutionException, InterruptedException {
    Publisher<String> stringPublisher = Source
        .from(Lists.newArrayList("Hello", "World", "!"))
        .runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);

    Source<String, NotUsed> allMessages = Source
        .fromPublisher(stringPublisher)
        .throttle(1, Duration.ofSeconds(1))
        .toMat(BroadcastHub.of(String.class, 256), Keep.right())
        .run(materializer);

    allMessages
        .runForeach(System.out::println, materializer)
        .toCompletableFuture()
        .get();
}

Thus it looks to me that the BroadcastHub sometimes drops elements when no Sink is already connected.