0
votes

I have an application which listens to a stream of events. These events tend to come in chunks: 10 to 20 of them within the same second, with minutes or even hours of silence between them. These events are processed and result in an aggregate state, and this updated state is sent further downstream.

In pseudo code, it would look something like this:

kafkaSource()
  .mapAsync(1)((entityId, event) => entityProcessor(entityId).process(event)) // yields entityState
  .mapAsync(1)(entityState => submitStateToExternalService(entityState))
  .runWith(kafkaCommitterSink)

The thing is that the downstream submitStateToExternalService has no use for 10-20 updated states per second - it would be far more efficient to just emit the last one and only handle that one.

With that in mind, I started looking if it wouldn't be possible to not emit the state after processing immediately, and instead wait a little while to see if more events are coming in.

In a way, it's similar to conflate, but that emits elements as soon as the downstream stops backpressuring, and my processing is actually fast enough to keep up with the events coming in, so I can't rely on backpressure.

I came across groupedWithin, but this emits elements whenever the window ends (or the max number of elements is reached). What I would ideally want, is a time window where the waiting time before emitting downstream is reset by each new element in the group.

Before I implement something to do this myself, I wanted to make sure that I didn't just overlook a way of doing this that is already present in akka-streams, because this seems like a fairly common thing to do.

1

1 Answers

2
votes

Honestly, I would make entityProcessor into an cluster sharded persistent actor.

case class ProcessEvent(entityId: String, evt: EntityEvent)

val entityRegion = ClusterSharding(system).shardRegion("entity")

kafkaSource()
  .mapAsync(parallelism) { (entityId, event) =>
    entityRegion ? ProcessEvent(entityId, event)
  }
  .runWith(kafkaCommitterSink)

With this, you can safely increase the parallelism so that you can handle events for multiple entities simultaneously without fear of mis-ordering the events for any particular entity.

Your entity actors would then update their state in response to the process commands and persist the events using a suitable persistence plugin, sending a reply to complete the ask pattern. One way to get the compaction effect you're looking for is for them to schedule the update of the external service after some period of time (after cancelling any previously scheduled update).

There is one potential pitfall with this scheme (it's also a potential issue with a homemade Akka Stream solution to allow n > 1 events to be processed before updating the state): what happens if the service fails between updating the local view of state and updating the external service?

One way you can deal with this is to encode whether the entity is dirty (has state which hasn't propagated to the external service) in the entity's state and at startup build a list of entities and run through them to have dirty entities update the external state.

If the entities are doing more than just tracking state for publishing to a single external datastore, it might be useful to use Akka Persistence Query to build a full-fledged read-side view to update the external service. In this case, though, since the read-side view's (State, Event) => State transition would be the same as the entity processor's, it might not make sense to go this way.

A midway alternative would be to offload the scheduling etc. to a different actor or set of actors which get told "this entity updated it's state" and then schedule an ask of the entity for its current state with a timestamp of when the state was locally updated. When the response is received, the external service is updated, if the timestamp is newer than the last update.