1
votes

I am using the following code to test the behaviour of akka stream Flow.batch, but I couldn't somehow figure out why the result is not what I expect:

  Source(1 to 20)
  .map(x => {
    println(s"received: ${x}")
    x
  })
  .batch(max=3, first => first.toString) {(batch, elem) => {
    batch + "," + elem
  }}
  .runWith(Sink.foreach(x=>{
    Thread.sleep(4000)
    println("Out:" + x)
  }))

And here is the output: received: 1 received: 2 received: 3 received: 4 Out:1,2,3 received: 5 Out:4 received: 6 Out:5 received: 7 Out:6 received: 8 Out:7 received: 9 Out:8 received: 10 Out:9 received: 11 Out:10 received: 12 Out:11 .... so on .... received: 19 Out:18 received: 20 Out:19 Out:20

There are few points I couldn't understand here:

  • Firstly, my Sink is much slower. I expect that the item will be batched together before emitted downstream such as: Out: 1,2,3; Out: 4,5,6; Out: 7, 8; Out: 9,10,11 and so on. Instead, it is only batched once (1,2,3), but subsequently element is emitted one by one instead of being batched.
  • Why I received 4 items (received: 1, ..., received: 4) at the right beginning while in fact, I only set max=3 (batch(max=3)).
  • Because the source is much faster than the sink. I expect that the element should be emitted much faster such as: received: 7, received: 8, received: 9; then Out:7,8,9; But in fact, it is only emitted sporadically one by one and only after Sink's println function is executed.

I have tried to change map to mapAsync but the behaviour is still not what I am looking for:

  .mapAsync(1)(x => {
    println(s"received: ${x}")
    Future.successful(x)
  })

Thanks.

1

1 Answers

2
votes

There is no asynchronous boundary anywhere in your code, it will run on a single thread. Basically, while your Thread.sleep() executes no other progress happens in this setup, i.e. batching cannot happen (since the thread is blocked on Thread.sleep). If you have such a setup then you can just use grouped() instead of batch, or maybe groupedWithin(). If you still want to try out batch(), then try a throttle stage instead of adding a sleep. Throttle will not block the thread, so upstream progress (batching) is not affected.