I have a Flink 1.11 job that consumes messages from a Kafka topic, keys them, filters them (keyBy followed by a custom ProcessFunction), and saves them into the db via JDBC sink (as described here: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/jdbc.html)
The Kafka consumer is initialized with these options:
properties.setProperty("auto.offset.reset", "earliest")
kafkaConsumer = new FlinkKafkaConsumer(topic, deserializer, properties)
kafkaConsumer.setStartFromGroupOffsets()
kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
Checkpoints are enabled on the cluster.
What I want to achieve is a guarantee for saving all filtered data into the db, even if the db is down for, let's say, 6 hours, or there are programming errors while saving to the db and the job needs to be updated, redeployed and restarted.
For this to happen, any checkpointing of the Kafka offsets should mean that either
- Data that was read from Kafka is in Flink operator state, waiting to be filtered / passed into the sink, and will be checkpointed as part of Flink operator checkpointing, OR
- Data that was read from Kafka has already been committed into the db.
While looking at the implementation of the JdbcSink, I see that it does not really keep any internal state that will be checkpointed/restored - rather, its checkpointing is a write out to the database. Now, if this write fails during checkpointing, and Kafka offsets do get saved, I'll be in a situation where I've "lost" data - subsequent reads from Kafka will resume from committed offsets and whatever data was in flight when the db write failed is now not being read from Kafka anymore nor is in the db.
So is there a way to stop advancing the Kafka offsets whenever a full pipeline (Kafka -> Flink -> DB) fails to execute - or potentially the solution here (in pre-1.13 world) is to create my own implementation of GenericJdbcSinkFunction that will maintain some ValueState until the db write succeeds?
setCommitOffsetsOnCheckpoints
implies so, but you explicitly state in your question that you are not using checkpointing. - Arvid Heise