I am facing a buffering and back pressuring problems while working with Akka Streams.
I have the following code (which is a simplified in order to isolate the problem):
object X extends App {
implicit val actorSysten = ActorSystem("Actory-System")
implicit val executionContext = actorSysten.dispatcher
def partition2(v : Int): Int = {
v % 2
}
val partialGraphDSL = GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val partitioner = builder.add(Partition[Int](2, partition2))
val mergeLatest = builder.add(MergeLatest[Int](2))
partitioner.out(0) ~> mergeLatest
partitioner.out(1) ~> mergeLatest
FlowShape(partitioner.in, mergeLatest.out)
}
val source = Source(1 to 1000)
.async
val mainGraph = GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val partialGraph = builder.add(partialGraphDSL)
val delay = Flow[List[Int]]
.delay(1.seconds, OverflowStrategy.backpressure)
.map { x => println("After delay."); x }
.withAttributes(Attributes.inputBuffer(1,1))
source ~> Flow[Int].map { x => println(x); x } ~> partialGraph ~> delay ~> Sink.foreach(println)
ClosedShape
}
val runnable = RunnableGraph.fromGraph(mainGraph)
val materialized = runnable.run()
}
This will output almost instantly from 1 to 1000 and then every second it will print the "After delay".
1
2
...
999
1000
After delay.
List(2, 1)
After delay.
List(2, 3)
I was expecting actually for the source to be back pressured by the delay flow. However, if I modify the code to not use the partialGraphDSL anymore, the output is exactly what I was expecting:
object X extends App {
implicit val actorSysten = ActorSystem("Actory-System")
implicit val executionContext = actorSysten.dispatcher
val source = Source(1 to 1000)
.async
val mainGraph = GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val delay = Flow[Int]
.delay(1.seconds, OverflowStrategy.backpressure)
.map { x => println("After delay."); x }
.withAttributes(Attributes.inputBuffer(1,1))
source ~> Flow[Int].map { x => println(x); x } ~> delay ~> Sink.foreach(println)
ClosedShape
}
val runnable = RunnableGraph.fromGraph(mainGraph)
val materialized = runnable.run()
}
Output:
1
After delay.
1
2
After delay.
...
It seems like the back pressure is not propagated from the delay through the "partialGraphDSL" towards the source. Why is this happening?
MergeLatest
doesn't do backpressure. – vbgd