1
votes

We use Apache Flink job cluster on Kubernetes that consists of one Job Manager and two Task Managers with two slots each. The cluster is deployed and configured using Lightbend Cloudflow framework.

We also use RocksDB state backend together with S3-compatible storage for the persistence. There are no any issues considering both savepoints creation from CLI. Our job consists of a few keyed states (MapState) and tends to be rather huge (we expect at least 150 Gb per each state). The Restart Strategy for the job is set to the Failure Rate. We use Apache Kafka as a source and sink throughout our jobs.

We currently doing some tests (mostly PoC's) and there are a few questions lingering:

We did some synthetic tests and passed incorrect events to the job. That lead to the Exceptions were thrown during the execution. Due to Failure Rate strategy the following steps happen: The Corrupted message from Kafka is read via source -> The Operator tries to process the event and eventually throws an Exception -> The Job restarts and reads THE SAME record from Kafka as at the step before -> The Operator fails -> The Failure Rate finally exceeds the given value and the job eventually stops. What should I do next? If we try to restart the job seems that it will be restored with the latests Kafka consumer state and will read the corrupted message once again, leading us back to the previously mentioned behavior? Which are the right steps to bare with such issues? And does Flink utilize any kind of so-called Dead Letter Queues?

The other question is about the checkpointing and restore mechanics. We are currently can't figure out which exceptions raised during a job execution are considered as critical and lead to the failure of the job following by automatic recovery from the latest checkpoint? As it described in the previous case, the ordinary Exception raised inside the job leads to continious restarts that finally followed by the job termination. We are looking for a cases to reproduce when something is happened with our cluster (Job Manager fails, Task Manager fails or something) that leads to the automatic recovery from the latest checkpoint. Any suggestions are welcomed considering such scenario in Kubernetes cluster.

We had sank into the Flink official documentation but didn't find any related information or possibly perceived it in the wrong way. Great thanks!

1

1 Answers

2
votes

The approach that Flink's Kafka deserializer takes is that if the deserialize method returns null, then the Flink Kafka consumer will silently skip the corrupted message. And if it throws an IOException, the pipeline is restarted, which can lead to a fail/restart loop as you have noted.

This is described in the last paragraph of this section of the docs.

Past work and discussion on this topic can be found in https://issues.apache.org/jira/browse/FLINK-5583 and https://issues.apache.org/jira/browse/FLINK-3679, and in https://github.com/apache/flink/pull/3314.

A dead letter queue would be a nice improvement, but I'm not aware of any effort in that direction. (Right now, side outputs from process functions are the only way to implement a dead letter queue.)