We have a multi-component application which offers a Reactive Streams API between the components. Some components are implemented using Akka Streams, others are using e.g. Reactor.
In one component we noticed that Streams did not process any messages, although the provided Publisher offers records. I nailed down the issue to the following situation:
Publisher<String> stringPublisher = Source
.from(Lists.newArrayList("Hello", "World", "!"))
.runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);
Source<String, NotUsed> allMessages = Source
.fromPublisher(stringPublisher)
.toMat(BroadcastHub.of(String.class, 256), Keep.right())
.run(materializer);
allMessages
.runForeach(System.out::println, materializer)
.toCompletableFuture()
.get();
One component provides a Publisher (it needs to be Publisher as the API uses Reactive Streams API, not Akka Streams API). This Publisher is created from another Akka Streams Source and turned into a Publisher using Sink.asPublisher
.
When we now materialize the Stream starting from the Publisher with a BroadcastHub no record is procesed at all.
I tried the same with a Reactor Publisher:
Publisher<String> stringPublisher = Flux.fromIterable(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);
This works as expected. Unfortunately I cannot exclude the situation that another component creates its Publisher from an Akka Stream Source.
Does anyone have an idea what goes wrong?