1
votes

After adding some managed state during processing we have spotted worrisome grow of checkpoints size and duration despite using incremental checkpointing with RocksDb.

To isolate the problem we created simple topology with source, a map operator and sink.

Source creates in memory arbitrary number of events with 1 event per second throughput. Every event has unique id that is used to partitioned stream (using keyBy operator) and goes through map function that adds around 100kB to managed state (ValueState used). Then events are simply passed to sink that does nothing.

Using set up described above we have sent 1200 events with checkpointing interval and minimal pause set to 5 sec. As events came with constant speed and equal amount of state we expected checkpoints size to be more or less constant. However we observed linearly growing peaks of checkpoints size (with the last peak having almost 120MB, close to size of whole expected managed state) with small checkpoints in between. For monitoring we used metric exposed by Flink and Prometheus with Grafana, please see some: checkpoint charts

We would like to understand why do we observe CPs peaks and why they are constantly growing?

What causes that some CPs saves expected size (around 500kB) and some have size around whole current managed state size even the load is constant?

What exactly is measured by lastCheckpointSize metric when incremental checkpointing is used?

Any hints, explanations will be much appreciated,

Thanks in advance.

1

1 Answers

0
votes

Flink's incremental checkpointing needs to (1) scale well to very large state and (2) allow restoring from checkpoints to be reasonably efficient, even after performing millions of checkpoints after running for weeks or months at a time. In particular, it is necessary to periodically coalesce/merge older checkpoints so that one doesn't end up trying to restore from an unbounded chain of checkpoints stretching back into the distant past. This is why you will see some checkpoints doing more work than others, even under constant load. Note also that this effect is more noticeable when testing with small amounts of state (120 MB is small compared to the 10+ terabytes of state that some Flink users report working with).

To understand how Flink's incremental checkpointing works in more detail, I suggest watching Stefan Richter's talk from Flink Forward.

Update:

For those more familiar with RocksDB, the issue is that RocksDB compaction will affect some checkpoints more than others, and with small state, may only occur rather infrequently.