
I am facing a buffering and back pressuring problems while working with Akka Streams.

I have the following code (which is a simplified in order to isolate the problem):

object X extends App {
  implicit val actorSysten = ActorSystem("Actory-System")
  implicit val executionContext = actorSysten.dispatcher

  def partition2(v : Int): Int = {
    v % 2

  val partialGraphDSL = GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._
    val partitioner = builder.add(Partition[Int](2, partition2))
    val mergeLatest = builder.add(MergeLatest[Int](2))

    partitioner.out(0) ~> mergeLatest
    partitioner.out(1) ~> mergeLatest

    FlowShape(partitioner.in, mergeLatest.out)

  val source = Source(1 to 1000)

  val mainGraph = GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._
    val partialGraph = builder.add(partialGraphDSL)

    val delay = Flow[List[Int]]
     .delay(1.seconds, OverflowStrategy.backpressure)
     .map { x => println("After delay."); x }

     source ~> Flow[Int].map { x => println(x); x } ~> partialGraph ~> delay ~> Sink.foreach(println)


   val runnable = RunnableGraph.fromGraph(mainGraph)
   val materialized = runnable.run()

This will output almost instantly from 1 to 1000 and then every second it will print the "After delay".

After delay.
List(2, 1)
After delay.
List(2, 3)

I was expecting actually for the source to be back pressured by the delay flow. However, if I modify the code to not use the partialGraphDSL anymore, the output is exactly what I was expecting:

object X extends App {
  implicit val actorSysten = ActorSystem("Actory-System")
  implicit val executionContext = actorSysten.dispatcher

  val source = Source(1 to 1000)

  val mainGraph = GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._

    val delay = Flow[Int]
      .delay(1.seconds, OverflowStrategy.backpressure)
      .map { x => println("After delay."); x }

    source ~> Flow[Int].map { x => println(x); x } ~> delay ~> Sink.foreach(println)


  val runnable = RunnableGraph.fromGraph(mainGraph)
  val materialized = runnable.run()


After delay.
After delay.

It seems like the back pressure is not propagated from the delay through the "partialGraphDSL" towards the source. Why is this happening?

Seems like the problem is that MergeLatest doesn't do backpressure.vbgd

1 Answers


The code behaves like that because it store the data in internal buffer. Due to that it is not getting back pressure.

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Attributes, ClosedShape, FlowShape, OverflowStrategy}
import akka.stream.scaladsl.{Flow, GraphDSL, MergeLatest, Partition, RunnableGraph, Sink, Source}

import scala.concurrent.duration._

object GraphWithBackPressure extends App {
  implicit val actorSysten = ActorSystem("Actory-System")
  implicit val executionContext = actorSysten.dispatcher

  def partition2(v : Int): Int = {
    v % 2

  val partialGraphDSL = GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._
    val partitioner = builder.add(Partition[Int](2, partition2))
    val mergeLatest = builder.add(MergeLatest[Int](2))

    partitioner.out(0) ~> mergeLatest
    partitioner.out(1) ~> mergeLatest

    FlowShape(partitioner.in, mergeLatest.out)

  val source = Source(1 to 1000000)

  val mainGraph = GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._
    val partialGraph = builder.add(partialGraphDSL)

    val delay = Flow[List[Int]]
      .delay(1.seconds, OverflowStrategy.backpressure)
      .map { x => println("After delay."); x }

    val newDelay = Flow[Int]
      .delay(1.seconds, OverflowStrategy.backpressure)
      .map { x => println("After delay."); x }

    source ~> newDelay ~> Flow[Int].map { x => println(x); x } ~> partialGraph ~> delay ~> Sink.foreach(println)


  val runnable = RunnableGraph.fromGraph(mainGraph)
  val materialized = runnable.run()(ActorMaterializer())

If we start the delay before Flow then we can see the back-pressure.

NOTE: Check the newDelay method