I am trying to use akka streams to accumulate data and use as batch:
val myFlow: Flow[String, Unit, NotUsed] = Flow[String].collect {
case record =>
println(record)
Future(record)
}.mapAsync(1)(x => x).groupedWithin(3, 30 seconds)
.mapAsync(10)(records =>
someBatchOperation(records))
)
My expectation from code above was not make any operation until 3 records are ready, or 30 seconds pass. But when I send some request with Source.single("test")
, it is processing this record without waiting for others or 30 seconds.
How can I use this flow to wait for other records to came or 30 seconds idle?
Record is coming from an API request one by one and I am trying to accumulate this data in flow like:
Source.single(apiRecord).via(myFlow).runWith(Sink.ignore)