0
votes

I'm trying to execute the following code based on the akka stream quick start guide:

implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()

val songs = Source.fromPublisher(SongsService.stream)

val count: Flow[Song, Int, NotUsed] = Flow[Song].map(_ => 1)

val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)

val counterGraph: RunnableGraph[Future[Int]] =
  songs
    .via(count)
    .toMat(sumSink)(Keep.right)

val sum: Future[Int] = counterGraph.run()

sum.foreach(c => println(s"Total songs processed: $c"))

The problem here is that the future never return a result. The biggest difference from the documentation example is my Source.

I have a play enumerator, which I'm converting it to an Akka Publisher, resulting in this SongsService.stream

When using a defined list as a Source like:

val songs = Source(list)

It works, but using the Source.fromPublisher does not.

But the problem here is not the publisher indeed, I can do a simple operation and it works:

val songs = Source.fromPublisher(SongsService.stream)
songs.runForeach(println)

It goes through the database, create the play enumerator, convert it to a publisher and I can iterate over.

Any ideas?

1
Does songs.runForeach(println) actually return Done if you await it?Mullefa
@Mullefa just realized that it does not.Thiago Pereira

1 Answers

4
votes

Your publisher is likely never completing.