I have a Flink job processing data at around 200k qps. Without checkpoints, the job is running fine. But when I tried to add checkpoints (with interval 50mins), it causes backpressue at the first task, which is adding a key field to each entry, the data lagging goes up constantly as well. the lagging of my two Kafka topics, first half was having checkpoints enabled, lagging goes up very quickly. second part(very low lagging was having checkpoints disabled, where the lagging is within milliseconds)
I am using at least once checkpoint mode
, which should be asynchronized process. Could anyone suggest?
My checkpointing setting
env.enableCheckpointing(1800000,
CheckpointingMode.AT_LEAST_ONCE);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig()
.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig()
.setCheckpointTimeout(10min);
env.getCheckpointConfig()
.setFailOnCheckpointingErrors(
jobConfiguration.getCheckpointConfig().getFailOnCheckpointingErrors());
my job has 128 containers.
With 10mins checkpoint time, following is the stats:
I am trying to use a 30mins checkpoint and see
I was trying to tune memory usage, but it seems not working.