0
votes

I have two flows:

val a: Flow[Input, Data, NotUsed] =...
val b: Flow[Input, Unit, NotUsed] =...

The first flow is a stream of data events I care about, the second is a stream of "signals", that is I really only want to send a Data downstream when an element is received in b.

I thought of using something like a.zipWith(b)((fromA, fromB) => fromA) but that seems to work only between a flow and a source (despite the Akka documentation implying that it supports zipping flows too).

What am I missing?

Thanks

3

3 Answers

1
votes

If you look at the signatures of zip and zipWith:

def zip[U](that: Graph[SourceShape[U], _]): Repr[(Out, U)]

def zipWith[Out2, Out3](that: Graph[SourceShape[Out2], _])(combine: (Out, Out2) => Out3): Repr[Out3]

both methods expect a Source.

Zipping a Flow with another Flow would not be as trivial as one might think (e.g. the 2nd Flow might be producing multiple elements per input element with mapConcat).

You could consider building a custom GraphStage as shown in the following trivialized example:

case class DataIn(id: Int)
case class DataOut(content: String)
case class Signal(s: Int)

class ZipperFlow extends GraphStage[FlowShape[(DataIn, Signal), DataOut]] {

  val in = Inlet[(DataIn, Signal)]("ZipperFlow.in")
  val out = Outlet[DataOut]("ZipperFlow.out")

  override val shape = FlowShape.of(in, out)

  override def createLogic(attr: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          push(out, DataOut("content-" + grab(in)._1.id))
        }
      })
      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          pull(in)
        }
      })
    }
}

Testing ZipperFlow:

implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

val dataSource = Source(1 to 5).map(DataIn(_))
val signalSource = Source(1 to 5).map(Signal(_))

val sink: Sink[DataOut, Future[Done]] = Sink.foreach(println)

dataSource.zip(signalSource).via(new ZipperFlow).runWith(sink)

// DataOut(content-1)
// DataOut(content-2)
// DataOut(content-3)
// DataOut(content-4)
// DataOut(content-5)
1
votes
  implicit class FlowExt[In, Out, Mat](flow: Flow[In, Out, Mat]) {
    /**
     * 
     * @param f
     * @tparam Out2
     * @return
     */
    def zipFlow[Out2](f: Flow[In, Out2, Mat]): Flow[In, (Out, Out2), NotUsed] = {
      Flow.fromGraph(GraphDSL.create() { implicit b =>
        import GraphDSL.Implicits._

        val broadcast = b.add(Broadcast[In](2))
        val zip = b.add(Zip[Out, Out2])

        broadcast.out(0) -> zip.in0
        broadcast.out(1) -> zip.in1

        FlowShape(broadcast.in, zip.out)
      })
    }
  }

Usage, val c: Flow[Input, (Data, Unit), NotUsed] = a.zipFlow(b)

0
votes

this can be achieved using merge in akka-streams graphs

update: the correct one is zip

example:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, ClosedShape}


object Application extends App {

  implicit val sys: ActorSystem = ActorSystem()
  implicit val mat: ActorMaterializer = ActorMaterializer()

  val flowX: Flow[Int, String, NotUsed] = Flow[Int].map(i => (i + 10).toString)
  val flowY: Flow[Int, Long, NotUsed] = Flow[Int].map(i => (i * 2).toLong)

  RunnableGraph.fromGraph(GraphDSL.create(flowX, flowY)((_, _)) { implicit builder =>
    (flowX, flowY) =>
      import GraphDSL.Implicits._
      val broadcast = builder.add(Broadcast[Int](2))
      val zip = builder.add(Zip[String, Long])
      Source((1 to 10).toList) ~> broadcast.in

      broadcast ~> flowX ~> zip.in0
      broadcast ~> flowY ~> zip.in1
      zip.out ~> Sink.foreach(println)
      ClosedShape
  }).run()
}

flowX&flowY are parameters for graph creation. in the constructing graph part you can find different cases for spliting and merging the stream (fan-out + fan-in). working with graphs is a bit more harder than with linear flow. maybe it makes sense just to create a partial graph with flow shape (1 input, 1 output) - so the user will see it as a usual flow (but with the complexity hidden). personally i would suggest to try not to use graphs in general, because it's harder to test (it's harder to find an error there or performance degradation), although it's a great feature in some cases

you can find a lot of methods for graph creation with different amount of parameters. moreover you can provide different input params for graph creation - different sources, flow, sinks.