A couple of points i'll volunteer up front:
- I'm new to Flink (working with it for about a month now)
- I'm using Kinesis Analytics (AWS hosted Flink solution). By all accounts this doesn't really limit the versatility of Flink or the options for fault tolerance, but I'll call it out anyways.
We have a fairly straight forward sliding window application. A keyed stream organizes events by a particular key, IP address for example, and then processes them in a ProcessorFunction. We mostly use this to keep track of counts of things. For example, how many logins for a particular IP address in the last 24 hours. Every 30 seconds we count the events in the window, per key, and save that value to an external data store. State is also updated to reflect the events in that window so that old events expire and aren't taking up memory.
Interestingly enough, cardinality is not an issue. If we have 200k folks logging in, in a 24 hour period, everything is perfect. Things start to get hairy when one IP logs in 200k times in 24 hours. At this point, checkpoints start to take longer and longer. An average checkpoint takes 2-3 seconds, but with this user behaviour, the checkpoints start to take 5 minutes, then 10, then 15, then 30, then 40, etc etc.
The application can run smoothly in this condition for a while, surprisingly. Perhaps 10 or 12 hours. But, sooner or later checkpoints completely fail and then our max iterator age starts to spike, and no new events are processed etc etc.
I've tried a few of things at this point:
- Throwing more metal at the problem (auto scaling turned on as well)
- Fussing with CheckpointingInterval and MinimumPauseBetweenCheckpoints https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CheckpointConfiguration.html
- Refactoring to reduce the footprint of the state we store
(1) didn't really do much. (2) This appeared to help but then another much larger traffic spike then what we'd seen before squashed any of the benefits (3) It's unclear if this helped. I think our application memory footprint is fairly small compared to what you'd imagine from a Yelp or an Airbnb who both use Flink clusters for massive applications so I can't imagine that my state is really problematic.
I'll say I'm hoping we don't have to deeply change the expectations of the application output. This sliding window is a really valuable piece of data.
EDIT: Somebody asked about what my state looks like it's a ValueState[FooState]
case class FooState(
entityType: String,
entityID: String,
events: List[BarStateEvent],
tableName: String,
baseFeatureName: String,
)
case class BarStateEvent(target: Double, eventID: String, timestamp: Long)
EDIT: I want to highlight something that user David Anderson said in the comments:
One approach sometimes used for implementing sliding windows is to use MapState, where the keys are the timestamps for the slices, and the values are lists of events.
This was essential. For anybody else trying to walk this path, I couldn't find a workable solution that didn't bucket events into some time slice. My final solution involves bucketing events into batches of 30 seconds and then writing those into map state as David suggested. This seems to do the trick. For our high periods of load, checkpoints remain at 3mb and they always finish in under a second.