2
votes

I have a queue need to be broadcasted and merged using akka stream graphs. enter image description here

I found the graph demo and queue demo. and don't know how to combine them. Can anyone help me out? Thanks

Here is the graph demo

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: 
GraphDSL.Builder[NotUsed] =>
  import GraphDSL.Implicits._
  val in = Source(1 to 10)
  val out = Sink.ignore

  val bcast = builder.add(Broadcast[Int](2))
  val merge = builder.add(Merge[Int](2))

  val f1, f2, f3, f4 = Flow[Int].map(_ + 10)

  in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
  bcast ~> f4 ~> merge
  ClosedShape
})

And here is the Queue Demo

val bufferSize = 5
val elementsToProcess = 3

val queue = Source
  .queue[Int](bufferSize, OverflowStrategy.backpressure)
  .throttle(elementsToProcess, 3.second)
  .map(x ⇒ x * x)
  .toMat(Sink.foreach(x ⇒ println(s"completed $x")))(Keep.left)
  .run()

val source = Source(1 to 10)

implicit val ec = system.dispatcher
source.mapAsync(1)(x ⇒ {
  queue.offer(x).map {
    case QueueOfferResult.Enqueued    ⇒ println(s"enqueued $x")
    case QueueOfferResult.Dropped     ⇒ println(s"dropped $x")
    case QueueOfferResult.Failure(ex) ⇒ println(s"Offer failed 
${ex.getMessage}")
    case QueueOfferResult.QueueClosed ⇒ println("Source Queue closed")
  }
}).runWith(Sink.ignore)

I want to run a graph that return a queue,so that I can offer element to it. Thanks

1

1 Answers

0
votes

Your val queue is the result of the "queue" (which became a RunnableGraph via the toMat combinator) being run. Your g is also a RunnableGraph (which you can call run on). Offering an element to such a graph means defining a Source which passes elements downstream. What you can combine are the different components making up such a runnable graph. It requires a Source and a Sink and may have an arbitrary amount of Flow components in between. I'd suggest you go through the official documentation for akka streams in order to understand how they work in general and look at the custom graphs section in particular.