2
votes

I am trying to use akka streams to accumulate data and use as batch:

val myFlow: Flow[String, Unit, NotUsed] = Flow[String].collect {
    case record =>
      println(record)
      Future(record)
  }.mapAsync(1)(x => x).groupedWithin(3, 30 seconds)
    .mapAsync(10)(records =>
      someBatchOperation(records))
    )

My expectation from code above was not make any operation until 3 records are ready, or 30 seconds pass. But when I send some request with Source.single("test"), it is processing this record without waiting for others or 30 seconds.

How can I use this flow to wait for other records to came or 30 seconds idle?

Record is coming from an API request one by one and I am trying to accumulate this data in flow like:

Source.single(apiRecord).via(myFlow).runWith(Sink.ignore)
1

1 Answers

3
votes

It actually does that. Let's consider the following:

Source(Stream.from(1)).throttle(1, 400 milli).groupedWithin(3, 1 seconds).runWith(Sink.foreach(i => println(s"Done with ${i} ${System.currentTimeMillis}")))

The output of that line, until I killed the process, was:

Done with Vector(1, 2, 3) 1599495716345
Done with Vector(4, 5) 1599495717348
Done with Vector(6, 7, 8) 1599495718330
Done with Vector(9, 10) 1599495719350
Done with Vector(11, 12, 13) 1599495720330
Done with Vector(14, 15) 1599495721350
Done with Vector(16, 17, 18) 1599495722328
Done with Vector(19, 20) 1599495723348
Done with Vector(21, 22, 23) 1599495724330

As we can see, the time differences between every time we emit 2 elements, to 3 elements, is a bit more than 1 second. That makes sense because after the 1 second delay, it took a bit more to get to the printing line.

The difference between every time we emit 2 elements, to 3 elements, is less than a second. Because it had enough elements to go on.

Why didn't it work in your example?

When you are using Source.single, then the source adds a complete stage to itself. You can see it in the source code of akka. In this case, the groupedWithin flow knows that it won't get any more elements, so it can emit the "test" string. In order to actually test this flow try to create a bigger stream.

When using Source(1 to 10) it actually translates into Source.Single, which completes the stream as well. We can see that here.