I'm trying to execute the following code based on the akka stream quick start guide:
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
val songs = Source.fromPublisher(SongsService.stream)
val count: Flow[Song, Int, NotUsed] = Flow[Song].map(_ => 1)
val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)
val counterGraph: RunnableGraph[Future[Int]] =
songs
.via(count)
.toMat(sumSink)(Keep.right)
val sum: Future[Int] = counterGraph.run()
sum.foreach(c => println(s"Total songs processed: $c"))
The problem here is that the future never return a result. The biggest difference from the documentation example is my Source.
I have a play enumerator, which I'm converting it to an Akka Publisher, resulting in this SongsService.stream
When using a defined list as a Source like:
val songs = Source(list)
It works, but using the Source.fromPublisher does not.
But the problem here is not the publisher indeed, I can do a simple operation and it works:
val songs = Source.fromPublisher(SongsService.stream)
songs.runForeach(println)
It goes through the database, create the play enumerator, convert it to a publisher and I can iterate over.
Any ideas?
songs.runForeach(println)
actually returnDone
if you await it? – Mullefa