1
votes

I have a setup in Flink 1.7.2 running on a Cloudera-managed cluster (resource allocation via Yarn) that gets high-volume data from an external Kafka and pipes it through a series of operators that aggregate, compute, aggregate again...I even use an iterative loop with filters and multiple operators inside, and finally a sink that writes the results to a rocksDB backend on my Hadoop cluster. All of that works for a certain amount of time (currently around 2-3 hours), and then the checkpointing gets stuck. I use exactly-once checkpointing with a generous timeout of 30 min and 10 min pause between checkpoints. 1 concurrent checkpoint. As long as everything works, these checkpoints finish within 1 min. But after a couple of hours one checkpoints gets stuck, meaning that the Checkpoint-UI tab tells me that one (or multiple) operators have not acknowledged all subtasks. By that time the normal process will have gotten stuck as well. The watermarks on my input source won't proceed and no more output will be produced. And they won't until the timer runs out. The next checkpoint then immediately activates, writes maybe 10% of all tasks and gets stuck again. No chance of recovery. If I cancel the job and restart it with the last successful checkpoint as starting point, the next checkpoint will get stuck in the same way.

I have tried many different things already, from changing the checkpoint frequency to the timeouts. I even changed from exactly-once to at-least-once since the alignment buffering was sometimes getting very expensive. But even then the same problem emerged after the same amount of time. Resource allocation does not seem to play a role either, I currently use 4 task slots per task manager and change the number of managers from time to time, but nothing changes. JVM heap-size does not appear to be a problem either, as I commit multiple GB, but apparently only a couple hundred MB are used.

No error messages are put out by the job- or taskmanagers, all the logs tell me is the attempt to write the checkpoint, the missing success-message and then the start of the next checkpoint.

2

2 Answers

2
votes

When you say that you use "an iterative loop with filters and multiple operators inside", are you using Flink's iteration construct with a streaming job?

Doing so is not recommended. As it says in the documentation:

Flink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing: env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE, force = true).

Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure.

That said, what you've described sounds like a situation in which backpressure is preventing the checkpoint barriers from progressing. Many things might cause this, but this blog post might help you diagnose the problem. But I'm not sure how much of that applies to a job using iterations.

0
votes

Please Consider the data skew in your pipeline~, maybe you can increase the parallelism of your processor to make it more balance