I have an application which listens to a stream of events. These events tend to come in chunks: 10 to 20 of them within the same second, with minutes or even hours of silence between them. These events are processed and result in an aggregate state, and this updated state is sent further downstream.
In pseudo code, it would look something like this:
kafkaSource()
.mapAsync(1)((entityId, event) => entityProcessor(entityId).process(event)) // yields entityState
.mapAsync(1)(entityState => submitStateToExternalService(entityState))
.runWith(kafkaCommitterSink)
The thing is that the downstream submitStateToExternalService
has no use for 10-20 updated states per second - it would be far more efficient to just emit the last one and only handle that one.
With that in mind, I started looking if it wouldn't be possible to not emit the state after processing immediately, and instead wait a little while to see if more events are coming in.
In a way, it's similar to conflate, but that emits elements as soon as the downstream stops backpressuring, and my processing is actually fast enough to keep up with the events coming in, so I can't rely on backpressure.
I came across groupedWithin, but this emits elements whenever the window ends (or the max number of elements is reached). What I would ideally want, is a time window where the waiting time before emitting downstream is reset by each new element in the group.
Before I implement something to do this myself, I wanted to make sure that I didn't just overlook a way of doing this that is already present in akka-streams, because this seems like a fairly common thing to do.