1
votes

I have a set of stream stages (Sources, Flows and Sinks) which I would like to add some MetaData information to.

Therefore rather than Sources producing A -> (A, StreamMetaData). I've managed to do this using custom stream stages whereby on grab(in) the element, I push(out, (elem, StreamMetaData)). In reality it is not 'converting' the existing Source but passing it to a flow to recreate a new source.

Now I'm trying to implement the below MetaStream stage:

1

Therefore given that the source is producing tuples of (A, StreamMetaData), I want to pass the A to an existing Flow for some computation to take place and then merge the output produced 'B' with the StreamMetaData. These will then be passed to a Sink which accepts (B, StreamMetaData).

How would you suggest I go about it. I've been informed partial graphs are the best bet and would help in completing such a task. UniformFanOut and UniformFanIn using Unzip((A streamMetaData), A, StreamMetaData) and Zip(A,B)

 val fanOut = GraphDSL.create() { implicit b =>
    val unzip = b.add(Unzip[T, StreamMetaData]) 
    UniformFanOutShape(unzip.in, unzip.out0, unzip.out1)
  }

  val fanIn = GraphDSL.create() { implicit b =>
    val zip = b.add(Zip[T ,StreamMetaData]()) 
    UniformFanInShape(zip)
  }

How can I connect the fanIn and fanOut so as to achieve the same behavior as in the picture?

I had something like this in mind;

 def metaFlow[T, B, Mat](flow: Flow[T, B, Mat]): Unit = {
   val wrappedFlow =
     Flow.fromGraph(GraphDSL.create(){ implicit b =>
       import GraphDSL.Implicits._

       val unzip: FanOutShape2[(T, StreamMetaData), T, StreamMetaData] = b.add(Unzip[T, StreamMetaData])
       val existingFlow = b.add(flow)
       val zip: FanInShape2[B,StreamMetaData,(B,StreamMetaData)] = b.add(Zip[B, StreamMetaData])

       unzip.out0 ~> existingFlow ~> zip.in0
       unzip.out1 ~> zip.in1

       FlowShape(unzip.in, zip.out)
     })

 }

Thanks in advance.

1

1 Answers

1
votes

This aprox creating a new SourceShape stacking flow graph can work, a little bit different of your flowShape implementation.

  def sourceGraph[A, B](f: A => B, source: Source[(A, StreamMetaData), NotUsed]) = Source.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
    import GraphDSL.Implicits._

    val unzip = builder.add(Unzip[A, StreamMetaData]())
    val zip   = builder.add(Zip[B, StreamMetaData]())

    val flow0  = builder.add(Flow[A].map { f(_) })

    val flow1 = source ~> unzip.in
                          unzip.out0 ~> flow0 ~> zip.in0
                          unzip.out1          ~> zip.in1


    SourceShape(zip.out)
  })



def flowGraph[A, B](f: A => B) = Flow.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._

    val unzip = builder.add(Unzip[A, StreamMetaData]())
    val zip   = builder.add(Zip[B, StreamMetaData]())

    val flow0  = builder.add(Flow[A].map { f(_) })

    unzip.out0 ~> flow0 ~> zip.in0
    unzip.out1          ~> zip.in1

    FlowShape(unzip.in, zip.out)
  })