1
votes

I am facing a buffering and back pressuring problems while working with Akka Streams.

I have the following code (which is a simplified in order to isolate the problem):

object X extends App {
  implicit val actorSysten = ActorSystem("Actory-System")
  implicit val executionContext = actorSysten.dispatcher

  def partition2(v : Int): Int = {
    v % 2
  }

  val partialGraphDSL = GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._
    val partitioner = builder.add(Partition[Int](2, partition2))
    val mergeLatest = builder.add(MergeLatest[Int](2))

    partitioner.out(0) ~> mergeLatest
    partitioner.out(1) ~> mergeLatest

    FlowShape(partitioner.in, mergeLatest.out)
  }

  val source = Source(1 to 1000)
     .async

  val mainGraph = GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._
    val partialGraph = builder.add(partialGraphDSL)

    val delay = Flow[List[Int]]
     .delay(1.seconds, OverflowStrategy.backpressure)
     .map { x => println("After delay."); x }
     .withAttributes(Attributes.inputBuffer(1,1))

     source ~> Flow[Int].map { x => println(x); x } ~> partialGraph ~> delay ~> Sink.foreach(println)

     ClosedShape
   }

   val runnable = RunnableGraph.fromGraph(mainGraph)
   val materialized = runnable.run()
 }

This will output almost instantly from 1 to 1000 and then every second it will print the "After delay".

1
2
...
999
1000
After delay.
List(2, 1)
After delay.
List(2, 3)

I was expecting actually for the source to be back pressured by the delay flow. However, if I modify the code to not use the partialGraphDSL anymore, the output is exactly what I was expecting:

object X extends App {
  implicit val actorSysten = ActorSystem("Actory-System")
  implicit val executionContext = actorSysten.dispatcher

  val source = Source(1 to 1000)
     .async

  val mainGraph = GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._

    val delay = Flow[Int]
      .delay(1.seconds, OverflowStrategy.backpressure)
      .map { x => println("After delay."); x }
      .withAttributes(Attributes.inputBuffer(1,1))

    source ~> Flow[Int].map { x => println(x); x } ~> delay ~> Sink.foreach(println)

    ClosedShape
  }

  val runnable = RunnableGraph.fromGraph(mainGraph)
  val materialized = runnable.run()
}

Output:

1
After delay.
1
2
After delay.
...

It seems like the back pressure is not propagated from the delay through the "partialGraphDSL" towards the source. Why is this happening?

1
Seems like the problem is that MergeLatest doesn't do backpressure.vbgd

1 Answers

2
votes

The code behaves like that because it store the data in internal buffer. Due to that it is not getting back pressure.

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Attributes, ClosedShape, FlowShape, OverflowStrategy}
import akka.stream.scaladsl.{Flow, GraphDSL, MergeLatest, Partition, RunnableGraph, Sink, Source}

import scala.concurrent.duration._

object GraphWithBackPressure extends App {
  implicit val actorSysten = ActorSystem("Actory-System")
  implicit val executionContext = actorSysten.dispatcher

  def partition2(v : Int): Int = {
    v % 2
  }

  val partialGraphDSL = GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._
    val partitioner = builder.add(Partition[Int](2, partition2))
    val mergeLatest = builder.add(MergeLatest[Int](2))

    partitioner.out(0) ~> mergeLatest
    partitioner.out(1) ~> mergeLatest

    FlowShape(partitioner.in, mergeLatest.out)
  }

  val source = Source(1 to 1000000)
    .async

  val mainGraph = GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._
    val partialGraph = builder.add(partialGraphDSL)

    val delay = Flow[List[Int]]
      .delay(1.seconds, OverflowStrategy.backpressure)
      .map { x => println("After delay."); x }
      .withAttributes(Attributes.inputBuffer(1,1))

    val newDelay = Flow[Int]
      .delay(1.seconds, OverflowStrategy.backpressure)
      .map { x => println("After delay."); x }
      .withAttributes(Attributes.inputBuffer(1,1))

    source ~> newDelay ~> Flow[Int].map { x => println(x); x } ~> partialGraph ~> delay ~> Sink.foreach(println)

    ClosedShape
  }

  val runnable = RunnableGraph.fromGraph(mainGraph)
  val materialized = runnable.run()(ActorMaterializer())
}

If we start the delay before Flow then we can see the back-pressure.

NOTE: Check the newDelay method