1
votes

Need suggestion, I need to run parallel multiple source graphs, for example I have created this sample code where I am creating 10 graphs and running them parallel.

Is this right approach or should I create multiple source inside a graph and run them parallel in one graph?

def createGraph(start: Int, end: Int, name: String) = {
  RunnableGraph.fromGraph(GraphDSL.create() {
    implicit builder =>
      import GraphDSL.Implicits._
      val s = Source(start to end)
      val f = Flow[Int].map[String](x => x.toString)
      val sink = Sink.foreach[String](x => println(name + ":" + x))

      val t = builder.add(s)

      val flow1 = builder.add(f)

      t ~> flow1 ~> sink

      ClosedShape
  })
}


(1 to 10).map(x => createGraph(x, x + 10, "g" + x)).map(_.run())

Thanks Arun

1
Why all that code to do the equivalent of: Source(start to end).map(_.toString).runForeach(x => println(s"$name:$x")) - Viktor Klang
I took out some complex flow processing from main code to maintain intellectual property. The problem statement is if I have multiple source information and need to run graph for each source, what would be best practice. For example we can think about multiple source may be think about reading multiple kafka topics, transform, process and sink with database. - ASe
The approach looks somewhat weird for me, though I cannot quite point out why. I'd separate time taking things to be their own stage, then use Merge and/or Balance to tie those together, into a single graph. Then run that one graph, just once. Now you are creating n "islands". - akauppi

1 Answers

0
votes

I tried parallelism using http://doc.akka.io/docs/akka/2.4.4/scala/stream/stream-parallelism.html , this looks good, where my source are different but flow and sink is same.Each source is simulation in the below example, consider them like you are reading from some external source as stream:

object TestParallelGraph extends App {

  implicit val system = ActorSystem("test")
  implicit val dispacher = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val listOfDifferentSource=List(1,2,3) //consider we have to read data from various sources


 def createGraph() = {
    RunnableGraph.fromGraph(GraphDSL.create() {
      implicit builder =>
        import GraphDSL.Implicits._

        val merge=builder.add(Merge[Int](listOfDifferentSource.length))

        val flow=builder.add(Flow[Int].map(_ + 10)) //just random flow to add 10

        //again as mentioned above creating source with different information to simulate
        Source(listOfDifferentSource.head*100 to 100* listOfDifferentSource.head+10) ~> merge ~> flow ~> Sink.foreach(println)

        for{
          i <- listOfDifferentSource.tail //for each other source
        }yield (Source(100*i to 100*i+10) ~> merge) 

        ClosedShape
    })
  }

  createGraph().run()
}