5
votes

I have a flow graph with broadcast and zip inside. If something (regardless what is it) fails inside this flow, I'd like to drop the problematic element passed to it and resume. I came up with the following solution:

val flow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val dangerousFlow = Flow[Int].map {
    case 5 => throw new RuntimeException("BOOM!")
    case x => x
  }
  val safeFlow = Flow[Int]
  val bcast = builder.add(Broadcast[Int](2))
  val zip = builder.add(Zip[Int, Int])

  bcast ~> dangerousFlow ~> zip.in0
  bcast ~> safeFlow ~> zip.in1

  FlowShape(bcast.in, zip.out)
})

Source(1 to 9)
  .via(flow)
  .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
  .runWith(Sink.foreach(println))

I'd expect it to print:

(1,1)
(2,2)
(3,3)
(4,4)
(5,5)
(6,6)
(7,7)
(8,8)
(9,9)

However, it deadlocks, printing only:

(1,1)
(2,2)
(3,3)
(4,4)

We've done some debugging, and it turns out it applied the "resume" strategy to its children, which caused dangerousFlow to resume after failure and thus to demand an element from bcast. bcast won't emit an element until safeFlow demands another element, which actually never happens (because it's waiting for demand from zip).

Is there a way to resume the graph regardless of what went wrong inside one of the stages?

1

1 Answers

4
votes

I think you understood the problem well. You saw that, when your element 5 crashes dangerousFlow, you should also stop the element 5 that is going through safeFlow because if it reaches the zip stage, you have the problem you describe. I don't know how to solve your problem between the broadcast and zip stages, but what about pushing the problem further, where it is easier to handle?

Consider using the following dangerousFlow:

import scala.util._
val dangerousFlow = Flow[Int].map {
  case 5 => Failure(new RuntimeException("BOOM!"))
  case x => Success(x)
}

Even in case of problem, dangerousFlow would still emit data. You can then zip as you are currently doing and would just need to add a collect stage as last step of your graph. On a flow, this would look like:

Flow[(Try[Int], Int)].collect {
  case (Success(s), i) => s -> i
}

Now if, as you wrote, you really expect it to output the (5, 5) tuple, use the following:

Flow[(Try[Int], Int)].collect {
  case (Success(s), i) => s -> i
  case (_, i)          => i -> i
}