0
votes

I created a small example graph in which the input elements are passed through to the output and also sent back to a feedback loop that drops everything (using filter).

I expect to get the same behaviour as an identity Flow[T], since the feedback branch drops everything.

What happens instead is that the input elements are emitted as expected but the materialization never completes.

Am I doing something wrong? Is this supposed to happen? Shouldn't the feedback output of the broadcast complete when the input stream completes?

I guess the problem is something similar to the chicken-and-egg scenario described here?

I am using akka-stream-experimental 2.0.3

Cheers

object Test extends App {
  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()

  val flow = Flow.fromGraph(GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._

    val dropEverything = b.add(Flow[Int].filter(_ => false))
    val input = b.add(Merge[Int](2))
    val bcast = b.add(Broadcast[Int](2))

    input                         ~> bcast
    input.in(1) <~ dropEverything <~ bcast.out(1)

    FlowShape(input.in(0), bcast.out(0))
  })

  val result = Source.single(42).via(flow).runWith(Sink.foreach(println))

  try {
    // prints 42 but the stream doesn't terminate and the await timeouts
    println(Await.result(result, 5.seconds))
  } finally {
    system.terminate()
  }
}
1

1 Answers

1
votes

This has been answered here. The cycle never completes because Merge and Broadcast are waiting for each other to complete.

You can change this to val input = b.add(Merge[Int](2, eagerComplete = true)) to prevent this.

Alternatively, you can try val dropEverything = b.add(Flow[Int].take(1).filter(_ => false)) where n is the number of elements from input to process, which is 1 in this case.