7
votes

I have several Flows in my program, that I would like to process in parallel. After all are completed, I would like to trigger some action.

One way of doing it would be to send a message to an Actor after each completion, and when the Actor verifies that all flows are ready, then it can trigger the action.

I was wondering if there was anything within the akka-streams Scala DSL that I may be overlooking that would make it even simpler.

EDIT: Converting a Flow to a future would not work because, as the documentation mentions, the Future is completed right after the first event that happens in the stream. Running the following code:

implicit val system = ActorSystem("Sys")
val fm = FlowMaterializer(MaterializerSettings())

def main(args: Array[String]): Unit = {
  val fut = Flow(1 second, {() => println("tick")}).toFuture(fm)

  fut.onComplete{ _ =>
    println("future completed")
  }
}

Prints "tick", followed by "future completed", and then an infinite sequence of "tick"s.

2
Couldn't you just use the ask pattern to get futures for those flows then throw them all in a for comprehension that triggers the action in the yield?Tim Gautier
@TimGautier I don't think the actors underlying the flows are exposed for me to be able to ask anything from them. Also, I am looking for something that is wholly within the Flow DSL.Eduardo
According to the Flow API it seems you can convert the Flow into a future, then should be easy to map to a single future: flow.toFuture(...). doc.akka.io/api/akka-stream-experimental/0.2/…Julio
@Julio: yes, but the documentation says the future is completed as soon as there is any action in the stream (not when the stream is completed).Eduardo
@Eduardo, I actually think Julio is correct. Each transformation on the Flow produces another Flow. If you perform several transforms and then use the final Flow instance to call toFuture, that Future should only be satisfied when all of the preceding transforms have been completed.cmbaxter

2 Answers

9
votes

As mentioned in the comment, I believe @Eduardo is right about converting the Flow to a Future. Consider this example:

implicit val system = ActorSystem("Sys")
import system.dispatcher

val text1 = 
  """hello1world
  foobar""".stripMargin

val text2 = 
  """this1is
  a1test""".stripMargin

def flowFut(text:String) = Flow(text.split("\\s").toVector)
  .map(_.toUpperCase())
  .map(_.replace("1", ""))
  .toFuture(FlowMaterializer(MaterializerSettings()))    


val fut1 = flowFut(text1)    
val fut2 = flowFut(text2)    
val fut3 = for{
  f1 <- fut1
  f2 <- fut2
} yield {
  s"$f1, $f2"
}

fut3 foreach {println(_)}

Here, I run two separate transforms on each set of text lines, converting to upper and removing the #1 from any text. I then force the result of this Flow to a Future so I can compose the results into a new Future which I then print out.

0
votes

Oh I see. If the flow processes multiple elements, the future will complete right after the first one.

I think you can use the flow.onComplete to complete some promises. e.g.

val promise1 = Promise[Unit]()
val promise2 = Promise[Unit]()

val flow1 = Flow(Iterator(1,2,3,4)).map(println)
val flow2 = Flow(Iterator('a,'b,'c,'d)).map(println)


flow1.onComplete(FlowMaterializer(MaterializerSettings())){
  case Success(_) =>  promise1.success()
  case Failure(e) => promise1.failure(e)
}
flow2.onComplete(FlowMaterializer(MaterializerSettings())){
  case Success(_) =>  promise2.success()
  case Failure(e) => promise2.failure(e)
}

for {
  e1<- promise1.future
  e2<- promise2.future
}{
  println(s"completed!")
}

If on the other hand want to do something after every tuple of elements have been completed processing, you can probably use the flow1.zip(flow2) to combine them.