I have a flow graph with broadcast and zip inside. If something (regardless what is it) fails inside this flow, I'd like to drop the problematic element passed to it and resume. I came up with the following solution:
val flow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val dangerousFlow = Flow[Int].map {
case 5 => throw new RuntimeException("BOOM!")
case x => x
}
val safeFlow = Flow[Int]
val bcast = builder.add(Broadcast[Int](2))
val zip = builder.add(Zip[Int, Int])
bcast ~> dangerousFlow ~> zip.in0
bcast ~> safeFlow ~> zip.in1
FlowShape(bcast.in, zip.out)
})
Source(1 to 9)
.via(flow)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.runWith(Sink.foreach(println))
I'd expect it to print:
(1,1)
(2,2)
(3,3)
(4,4)
(5,5)
(6,6)
(7,7)
(8,8)
(9,9)
However, it deadlocks, printing only:
(1,1)
(2,2)
(3,3)
(4,4)
We've done some debugging, and it turns out it applied the "resume" strategy to its children, which caused dangerousFlow
to resume after failure and thus to demand an element from bcast
. bcast
won't emit an element until safeFlow
demands another element, which actually never happens (because it's waiting for demand from zip
).
Is there a way to resume the graph regardless of what went wrong inside one of the stages?