We use Apache Flink job cluster
on Kubernetes that consists of one Job Manager
and two Task Managers
with two slots each. The cluster is deployed and configured using Lightbend Cloudflow
framework.
We also use RocksDB
state backend together with S3-compatible storage for the persistence. There are no any issues considering both savepoints
creation from CLI. Our job consists of a few keyed states (MapState
) and tends to be rather huge (we expect at least 150 Gb per each state). The Restart Strategy
for the job is set to the Failure Rate
. We use Apache Kafka
as a source and sink throughout our jobs.
We currently doing some tests (mostly PoC's) and there are a few questions lingering:
We did some synthetic tests and passed incorrect events to the job. That lead to the Exceptions
were thrown during the execution. Due to Failure Rate
strategy the following steps happen: The Corrupted message from Kafka is read via source -> The Operator tries to process the event and eventually throws an Exception
-> The Job restarts and reads THE SAME record from Kafka as at the step before -> The Operator fails -> The Failure Rate
finally exceeds the given value and the job eventually stops. What should I do next? If we try to restart the job seems that it will be restored with the latests Kafka consumer state and will read the corrupted message once again, leading us back to the previously mentioned behavior? Which are the right steps to bare with such issues? And does Flink utilize any kind of so-called Dead Letter Queues
?
The other question is about the checkpointing and restore mechanics. We are currently can't figure out which exceptions raised during a job execution are considered as critical and lead to the failure of the job following by automatic recovery from the latest checkpoint? As it described in the previous case, the ordinary Exception
raised inside the job leads to continious restarts that finally followed by the job termination. We are looking for a cases to reproduce when something is happened with our cluster (Job Manager
fails, Task Manager
fails or something) that leads to the automatic recovery from the latest checkpoint. Any suggestions are welcomed considering such scenario in Kubernetes cluster.
We had sank into the Flink official documentation but didn't find any related information or possibly perceived it in the wrong way. Great thanks!