0
votes

My flink job reads from kafka consumer using FlinkKafkaConsumer010 and sinks into hdfs using CustomBucketingSink. We have series of transformations kafka -> flatmaps(2-3 transformations) -> keyBy -> tumblingWindow(5 mins) -> Aggregation -> hdfsSink. We have kafka input of 3 millions/min events on an average and around 20 millions/min events on peak time. Checkpointing duration and minimum pause between two checkpoiting is 3 mins and i am using FsStateBackend.

Here are my assumptions :

Flink consumes some fixed number of events from kafka(multiple offsets from multiple partitions at once) and waits till it reachs to sink and then checkpoints. In case of success it commits the kafka partitions offset it read and maintains some state related to hdfs file it was writting. While multiple transformations were going after kafka hand over events to other operators, kafka consumer sits idle until it gets confirmation for success for the events that it sent. So we can say while sink is writting data to hdfs all previous operators were sitting idle. In case of failure flink goes to previous checkpoint state and points to kafka last partition offset committed and points to hdfs file offest it should start writting to.

Here are my doubts based on above assumptions:

1) Is above assumption is correct. 2) Does it make sense for tumbling window to have state as in case of failure anyway we start from last kafka partition commited offset. 3) In case tumbling window make state, when will this state can be used by flink. 4) Why checkpoint and savepoint state size vary. 5) In case of any failure, flink always starts from sorce operator. Right ?

1

1 Answers

2
votes

Your assumptions are not correct.

(1) Checkpointing does not depend in any way on events or results reaching the sink(s).

(2) Flink does its own Kafka offset management. When restoring from a checkpoint, after a failure, the offsets in the checkpoint are used, not those that may have been committed back to Kafka.

(3) No operators are ever idle in the way you've described. The pipeline is not stalled by checkpointing.

The best way to understand how checkpointing works is to go through the Flink operations playground, especially the section on Observing Failure and Recovery. This will give you a much clearer understanding of this topic, because you'll be able to observe exactly what's happening.

I can also recommend reading https://ci.apache.org/projects/flink/flink-docs-master/training/fault_tolerance.html, and following the links contained there.

But to walk through how checkpointing works in your application, here are the basic steps:

(1) When the checkpoint coordinator (part of the job manager) decides it's time to initiate another checkpoint, it informs each of the task managers to start checkpoint n.

(2) All of the sources instances checkpoint their own state, and insert checkpoint barrier n into their outgoing streams. In your case, the sources are Kafka consumers, and they checkpoint the current offset for each partition.

(3) Whenever the checkpoint barrier reaches the head of the input queue in a stateful operator, that operator checkpoints its state and forwards the barrier. This part has some complexity to it -- but basically, the state is held in a multi-version, concurrency controlled hash map. The operator creates a new version n+1 of the state that can be modified by the events behind the checkpoint barrier, and creates a new thread to asynchronously snapshot all the state in version n.

In your case, the window and sink are stateful. The window's state includes the current window contents, the state of the trigger, and other state you're using for window processing, if any.

(4) Sinks use the arrival of the barrier to flush any queued output, and commit pending transactions. Again, there's some complexity here, as transactional sinks use a two-phase commit protocol.

In your application, if the checkpoint interval is much smaller than the window duration, then the sink will complete many checkpoints before ever receiving any output from the window.

(5) When the checkpoint coordinator has heard back from every task that the checkpoint is complete, it finalizes the checkpoint metadata.

During recovery, the state of every operator is reset to the state in the most recent checkpoint. This means that the sources are rewound to the offsets in the checkpoint, and processing resumes with the state in the window and sink corresponding to what it should be after having consumed the events up to those offsets.

Note: To keep this reasonably simple, I've glossed over a bunch of details. Also, FLIP-76 will introduce a new approach to checkpointing.