2
votes

I have a Flink 1.11 job that consumes messages from a Kafka topic, keys them, filters them (keyBy followed by a custom ProcessFunction), and saves them into the db via JDBC sink (as described here: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/jdbc.html)

The Kafka consumer is initialized with these options:

properties.setProperty("auto.offset.reset", "earliest")
kafkaConsumer = new FlinkKafkaConsumer(topic, deserializer, properties)
kafkaConsumer.setStartFromGroupOffsets()
kafkaConsumer.setCommitOffsetsOnCheckpoints(true)

Checkpoints are enabled on the cluster.

What I want to achieve is a guarantee for saving all filtered data into the db, even if the db is down for, let's say, 6 hours, or there are programming errors while saving to the db and the job needs to be updated, redeployed and restarted.

For this to happen, any checkpointing of the Kafka offsets should mean that either

  1. Data that was read from Kafka is in Flink operator state, waiting to be filtered / passed into the sink, and will be checkpointed as part of Flink operator checkpointing, OR
  2. Data that was read from Kafka has already been committed into the db.

While looking at the implementation of the JdbcSink, I see that it does not really keep any internal state that will be checkpointed/restored - rather, its checkpointing is a write out to the database. Now, if this write fails during checkpointing, and Kafka offsets do get saved, I'll be in a situation where I've "lost" data - subsequent reads from Kafka will resume from committed offsets and whatever data was in flight when the db write failed is now not being read from Kafka anymore nor is in the db.

So is there a way to stop advancing the Kafka offsets whenever a full pipeline (Kafka -> Flink -> DB) fails to execute - or potentially the solution here (in pre-1.13 world) is to create my own implementation of GenericJdbcSinkFunction that will maintain some ValueState until the db write succeeds?

1
Not sure how will that help with the save into the database? Regardless of whether I group by or use windows, at the end I need to save the resulting window into the db, and that's where my issue is. - kozyr
@kozyr Flink 1.13 brought exactly once support for the JDBC connector (currently not supported for MySQL). This means that if you're using Kafka with exactly once support and JDBC, the offset committing during checkpoint should be aborted in case one of the operators fail. More on that here - Yuval Itzchakov
@YuvalItzchakov Unfortunately I'm on 1.11 since I'm using Kinesis Data Analytics for running Flink, and the latest version there is 1.11. - kozyr
I don't quite get your question: are you using checkpointing or not? setCommitOffsetsOnCheckpoints implies so, but you explicitly state in your question that you are not using checkpointing. - Arvid Heise

1 Answers

1
votes

There are 3 options that I can see:

  1. Try out the JDBC 1.13 connector with your Flink version. There is a good chance it might just work.
  2. If that doesn't work immediately, check if you can backport it to 1.11. There shouldn't be too many changes.
  3. Write your own 2-phase-commit sink, either by extending TwoPhaseCommitSinkFunction or implement your own SinkFunction with CheckpointedFunction and CheckpointListener. Basically, you create a new transaction after a successful checkpoint and commit it with notifyCheckpointCompleted.