0
votes

I'm currently trying to make program using Akka Streams with a pipe-and-filter architecture in Scala. I have a specific graph which should take one input and output it to multiple flows. At the end the results of all different flows should be combined into one. In my case, the input would be all kinds of tweets. These tweets then first go to different filters, that all filter on one type and afterwards go to a scan, that simply counts how many of a certain type it has seen. After this, I would like for the output to be the return values of these scans and combine it into a tuple.

Right now, I have a specific graph DSL set up for this that uses Broadcast and ZipWith to do this. My code is the following:

val splitStreams =
  Flow.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._

    val bcastTweets = builder.add(Broadcast[Tweet](4))
    val zipTweets = builder.add(ZipWith[Int, Int, Int, Int, (Int, Int, Int, Int)]((a, b, c, d) => (a, b, c, d)))

    bcastTweets.out(0) ~> retweetFlow ~> retweetCount ~> zipTweets.in0
    bcastTweets.out(1) ~> replyFlow ~> replyCount ~> zipTweets.in1
    bcastTweets.out(2) ~> quotedFlow ~> quotedCount ~> zipTweets.in2
    bcastTweets.out(3) ~> normalFlow ~> normalCount ~> zipTweets.in3

    FlowShape(bcastTweets.in, zipTweets.out)
  })

The problem is, that when I test this code, the broadcast doesn't seem to go in either of the flows.

Can anyone tell me what I am doing wrong, I have been looking at this for approximately 2 days and can't figure it out.

2

2 Answers

1
votes

The described problem has to do with ZipWith (and Zip) not being able to work with filtered Shapes as its input. My guess is that Akka Stream doesn't know how to properly zip the elements of the individually filtered Shapes. Apparently, ZipWith/Zip would work if the involved Flows were, say, plain mappings using map.

One work-around for what you need is to substitute ZipWith with Merge along with grouped, as shown in the following trivialized example with a number of dummy filtering Flows:

import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream._

implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()  // Not needed for Akka Stream 2.6+
implicit val ec = system.dispatcher

val n = 4

def filterFlow(i: Int) = Flow[Int].filter(_ % n == i)

val customFlow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val bcast = builder.add(Broadcast[Int](n))
  val merger = builder.add(Merge[Int](n))

  (0 until n).foreach{ i =>
    bcast.out(i) ~> filterFlow(i) ~> merger.in(i)
  }

  FlowShape(bcast.in, merger.out)
})

Source(0 to 9).via(customFlow).grouped(n).runForeach(println)
// Output:
// Vector(0, 1, 2, 3)
// Vector(4, 5, 6, 7)
// Vector(8, 9)

If the output needs to be Tuples, simply apply collect like below (e.g. for n = 4):

val empty = -1  // Default place-holder value

Source(0 to 9).via(customFlow).grouped(n).collect{
    case Vector(a)          => (a, empty, empty, empty)
    case Vector(a, b)       => (a, b, empty, empty)
    case Vector(a, b, c)    => (a, b, c, empty)
    case Vector(a, b, c, d) => (a, b, c, d)
  }.runForeach(println)
// Output:
// (0, 1, 2, 3)
// (4, 5, 6, 7)
// (8, 9, -1, -1)
0
votes

Here is some background about what is going on:

Zip requires one element to come down each upstream to be zipped into a tuple (it can't make up a value for a position if it hasn't seen one yet) and will not demand more elements on either upstream until it has zipped a tuple and sent downstream.

Broadcast on the other hand can only emit when it has seen demand from all its downstreams so that it safely can emit an element to all of them. Therefore if one of the streams between broadcast and zip drops any element, you will end up with a stuck stream - zip cannot demand more and broadcast cannot emit to all.

You could get out of this deadlock by adding a detach or buffer as a first operator in each of the broadcast streams. You'd have to think carefully about if is what you want to achieve though.

Merge and will just emit the individual elements coming down any of the upstream in to the downstream, and MergeLatest will emit as soon as it has seen at least one input on all inputs (meaning it could also deadlock if the first element is filtered on one of the inputs) and then also potentially repeat values, so both those are very different from zipping.