If you look at the signatures of zip
and zipWith
:
def zip[U](that: Graph[SourceShape[U], _]): Repr[(Out, U)]
def zipWith[Out2, Out3](that: Graph[SourceShape[Out2], _])(combine: (Out, Out2) => Out3): Repr[Out3]
both methods expect a Source
.
Zipping a Flow
with another Flow
would not be as trivial as one might think (e.g. the 2nd Flow
might be producing multiple elements per input element with mapConcat
).
You could consider building a custom GraphStage
as shown in the following trivialized example:
case class DataIn(id: Int)
case class DataOut(content: String)
case class Signal(s: Int)
class ZipperFlow extends GraphStage[FlowShape[(DataIn, Signal), DataOut]] {
val in = Inlet[(DataIn, Signal)]("ZipperFlow.in")
val out = Outlet[DataOut]("ZipperFlow.out")
override val shape = FlowShape.of(in, out)
override def createLogic(attr: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
push(out, DataOut("content-" + grab(in)._1.id))
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
Testing ZipperFlow
:
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val dataSource = Source(1 to 5).map(DataIn(_))
val signalSource = Source(1 to 5).map(Signal(_))
val sink: Sink[DataOut, Future[Done]] = Sink.foreach(println)
dataSource.zip(signalSource).via(new ZipperFlow).runWith(sink)
// DataOut(content-1)
// DataOut(content-2)
// DataOut(content-3)
// DataOut(content-4)
// DataOut(content-5)