I'm currently trying to make program using Akka Streams with a pipe-and-filter architecture in Scala. I have a specific graph which should take one input and output it to multiple flows. At the end the results of all different flows should be combined into one. In my case, the input would be all kinds of tweets. These tweets then first go to different filters, that all filter on one type and afterwards go to a scan, that simply counts how many of a certain type it has seen. After this, I would like for the output to be the return values of these scans and combine it into a tuple.
Right now, I have a specific graph DSL set up for this that uses Broadcast and ZipWith to do this. My code is the following:
val splitStreams =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val bcastTweets = builder.add(Broadcast[Tweet](4))
val zipTweets = builder.add(ZipWith[Int, Int, Int, Int, (Int, Int, Int, Int)]((a, b, c, d) => (a, b, c, d)))
bcastTweets.out(0) ~> retweetFlow ~> retweetCount ~> zipTweets.in0
bcastTweets.out(1) ~> replyFlow ~> replyCount ~> zipTweets.in1
bcastTweets.out(2) ~> quotedFlow ~> quotedCount ~> zipTweets.in2
bcastTweets.out(3) ~> normalFlow ~> normalCount ~> zipTweets.in3
FlowShape(bcastTweets.in, zipTweets.out)
})
The problem is, that when I test this code, the broadcast doesn't seem to go in either of the flows.
Can anyone tell me what I am doing wrong, I have been looking at this for approximately 2 days and can't figure it out.