1
votes

I'm reading into the details of Flink's checkpointing mechanism right now and by now, I think I have a really good overview about how everything is tied together but one last issue strikes me here. It's about how checkpoints and commits interact with each other in the ExactlyOnce context, because I have the feeling that there's still potential for data loss/duplicate records. Mainly I was thinking about potential failures of the commit message or its callback, when I stumbled upon this paragraph in the Flink Blog:

After a successful pre-commit, the commit must be guaranteed to eventually succeed – both our operators and our external system need to make this guarantee. If a commit fails (for example, due to an intermittent network issue), the entire Flink application fails, restarts according to the user’s restart strategy, and there is another commit attempt. This process is critical because if the commit does not eventually succeed, data loss occurs.

Up until this point, I still had the impression that checkpoints would have to be acknowledged by the sink commit first, before they would be viewed as "valid". But apparently, once all operators are ready to actually commit, the checkpoint starts to exist and from that point on, the sink has to guarantee the commit can be done to ensure no data being lost. What exactly happens if my commit can never be done, e.g. if my Kafka sink is down for a longer period of time? Does this mean if the defined retries run out eventually, the checkpointed state will just be treated as the correct state or will Flink only be able to resume the job once this specific commit was able to be done and thus be stuck until broker is available again?

And what if the callback of the commit is lost somehow, will this be resolved in the next retry attempt or since the transaction is "done" now, the producer will not be able to commit and we enter this loop of repeated retries? (more of a Kafka question probably)

1

1 Answers

2
votes

For committing the side effects (so things like external state, vide Kafka transactions), Flink is using two phase commit protocol.

Let's say we are performing checkpoint 42. First pre-commit requests are issued. If all participants (parallel subtasks/operators) successfully acknowledged the pre-commit, JobManager/CheckpointCoordinator will start sending out commit requests.

The thing is, if failure happens at this point of time, there is no way going back. If either some commit fails or there is some other unrelated failure, job will be restarted from the checkpoint 42 and Flink will re-attempt to commit the pending/pre-committed transactions. If failure happens again, rinse and repeat according to your selected restart strategy. If you want to avoid data loss, commit attempts must eventually succeed. There is simply no other way. We can not revert those transactions, as once some commit request were issued, some transactions might have already been committed, so we can not rollback only portion of them (otherwise we would have data duplication problem).