Flink does not purely rely on Kafka’s consumer group offset tracking, but tracks and checkpoints these offsets internally as well.
Flink consumes data from Kafka topics and periodically checkpoints using Flink's distributed checkpointing mechanism. In case of failure, Flink will restore the records from checkpoint directory and will start reading data from Kafka offset after that.
It is important to enable checkpointing in Flink in order to use fault Kafka consumer.
Here is how you can do it.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs