I am looking for a way to easily reuse akka-stream flows.
I treat the Flow I intend to reuse as a function, so I would like to keep its signature like:
Flow[Input, Output, NotUsed]
Now when I use this flow I would like to be able to 'call' this flow and keep the result aside for further processing.
So I want to start with Flow emiting [Input]
, apply my flow, and proceed with Flow emitting [(Input, Output)]
.
example:
val s: Source[Int, NotUsed] = Source(1 to 10)
val stringIfEven = Flow[Int].filter(_ % 2 == 0).map(_.toString)
val via: Source[(Int, String), NotUsed] = ???
Now this is not possible in a straightforward way because combining flow with .via()
would give me Flow emitting just [Output]
val via: Source[String, NotUsed] = s.via(stringIfEven)
Alternative is to make my reusable flow emit [(Input, Output)]
but this requires every flow to push its input through all the stages and make my code look bad.
So I came up with a combiner like this:
def tupledFlow[In,Out](flow: Flow[In, Out, _]):Flow[In, (In,Out), NotUsed] = {
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[In](2))
val zip = b.add(Zip[In, Out])
broadcast.out(0) ~> zip.in0
broadcast.out(1) ~> flow ~> zip.in1
FlowShape(broadcast.in, zip.out)
})
}
that is broadcasting the input to the flow and as well in a parallel line directly -> both to the 'Zip' stage where I join values into a tuple. It then can be elegantly applied:
val tupled: Source[(Int, String), NotUsed] = s.via(tupledFlow(stringIfEven))
Everything great but when given flow is doing a 'filter' operation - this combiner is stuck and stops processing further events.
I guess that is due to 'Zip' behaviour that requires all subflows to do the same - in my case one branch is passing given object directly so another subflow cannot ignore this element with. filter(), and since it does - the flow stops because Zip is waiting for push.
Is there a better way to achieve flow composition? Is there anything I can do in my tupledFlow to get desired behaviour when 'flow' ignores elements with 'filter' ?
Flow[T, U, ...]
is not a function. For each input element it may return 0, 1, or more output elements. It may even keep back input elements and use them only later when more data is available. For this reason it is impossible to provide this feature generically if the wrapped flow doesn't support it itself. It can work generically, if it is strictly enforced that the wrappedFlow
is a one-to-one flow that actually works like a function (but filter doesn't work then). Usually, usingmapAsync
in such cases is a simpler way. – jrudolphFlow
may output 0 or 1 element for every 1 input element would allow to write a different semanticZip
operator that would zip with input only if wrappedFlow
outputs and skip everything if the wrappedFlow
is not pushing any element. – Tomasz Bartczak