I am using the following code to test the behaviour of akka stream Flow.batch, but I couldn't somehow figure out why the result is not what I expect:
Source(1 to 20)
.map(x => {
println(s"received: ${x}")
x
})
.batch(max=3, first => first.toString) {(batch, elem) => {
batch + "," + elem
}}
.runWith(Sink.foreach(x=>{
Thread.sleep(4000)
println("Out:" + x)
}))
And here is the output:
received: 1
received: 2
received: 3
received: 4
Out:1,2,3
received: 5
Out:4
received: 6
Out:5
received: 7
Out:6
received: 8
Out:7
received: 9
Out:8
received: 10
Out:9
received: 11
Out:10
received: 12
Out:11
.... so on ....
received: 19
Out:18
received: 20
Out:19
Out:20
There are few points I couldn't understand here:
- Firstly, my Sink is much slower. I expect that the item will be batched together before emitted downstream such as: Out: 1,2,3; Out: 4,5,6; Out: 7, 8; Out: 9,10,11 and so on. Instead, it is only batched once (1,2,3), but subsequently element is emitted one by one instead of being batched.
- Why I received 4 items (received: 1, ..., received: 4) at the right beginning while in fact, I only set max=3 (batch(max=3)).
- Because the source is much faster than the sink. I expect that the element should be emitted much faster such as: received: 7, received: 8, received: 9; then Out:7,8,9; But in fact, it is only emitted sporadically one by one and only after Sink's println function is executed.
I have tried to change map to mapAsync but the behaviour is still not what I am looking for:
.mapAsync(1)(x => {
println(s"received: ${x}")
Future.successful(x)
})
Thanks.