2
votes

I have a Flink job processing data at around 200k qps. Without checkpoints, the job is running fine. But when I tried to add checkpoints (with interval 50mins), it causes backpressue at the first task, which is adding a key field to each entry, the data lagging goes up constantly as well. the lagging of my two Kafka topics, first half was having checkpoints enabled, lagging goes up very quickly. second part(very low lagging was having checkpoints disabled, where the lagging is within milliseconds) enter image description here

I am using at least once checkpoint mode, which should be asynchronized process. Could anyone suggest? My checkpointing setting

    env.enableCheckpointing(1800000,
          CheckpointingMode.AT_LEAST_ONCE);
      env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
      env.getCheckpointConfig()
          .enableExternalizedCheckpoints(
              CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
      env.getCheckpointConfig()
          .setCheckpointTimeout(10min);
      env.getCheckpointConfig()
          .setFailOnCheckpointingErrors(
              jobConfiguration.getCheckpointConfig().getFailOnCheckpointingErrors());

my job has 128 containers.

With 10mins checkpoint time, following is the stats: enter image description here

I am trying to use a 30mins checkpoint and see

I was trying to tune memory usage, but it seems not working. my settings

But in the task manager, it's still: enter image description here

2

2 Answers

4
votes

TLDR; it's sometimes hard to analyse the problem. I have two lucky guesses/shots - if you are using RocksDB state backend, you could switch to FsStateBackend - it's usually faster and RocksDB makes most sense with large state sizes, that do not fit into memory (or if you really need incremental checkpointing feature). Second is to fiddle with parallelism, either increasing or decreasing.

I would suspect the same thing that @ArvidHeise wrote. You checkpoint size is not huge, but it's also not trivial. It can add the extra overhead to bring the job over the threshold of barely keeping up with the traffic, to not keeping up and causing the backpressure. If you are under backpressure, the latency will just keep accumulating, so even a change in couple of % of extra overhead can make a difference between end to end latencies of milliseconds vs unbounded ever growing value.

If you can not just simply add more resources, you would have to analyse what's exactly adding this extra over head and what resource is the bottleneck.

  1. Is it CPU? Check CPU usage on the cluster. If it's ~100%, that's the thing you need to optimise for.
  2. Is it IO? Check IO usage on the cluster and compare it against the maximal throughput/number of requests per second that you can achieve.
  3. If both CPU & IO usage is low, you might want to try to increase parallelism, but...
  4. Keep in mind data skew. Backpressure could be caused by a single task and in that case it makes it hard to analyse the problem, as it will be a single bottlenecked thread (on either IO or CPU), not whole machine.

After figuring out what resource is the bottleneck, next question would be why? It might be immediately obvious once you see it, or it might require digging in, like checking GC logs, attaching profiler etc.

Answering those questions could either give you information what you could try to optimise in your job or allow you to tweak configuration or could give us (Flink developers) an extra data point what we could try to optimise on the Flink side.

2
votes

Any kind of checkpointing adds computation overhead. Most of the checkpointing is asynchronously (as you have stated), but it still adds up general I/O operations. These additional I/O request may, for example, congest your access to external systems. Also if you enable checkpointing, Flink needs to keep track of more information (new vs. already checkpointed).

Have you tried to add more resources to your job? Could you share your whole checkpointing configuration?