How do I make sure I always consume from the beginning of a Kafka topic with Flink?
With the Kafka 0.9.x consumer that is part of Flink 1.0.2, it appears that it's no longer Kafka but Flink to control the offset:
Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets committed to Kafka / ZooKeeper are only to bring the outside view of progress in sync with Flink's view of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer has consumed a topic.
This is how far I got, but my Flink program always starts where it left off, and doesn't return to the beginning as the configuration instructs it to:
val props = new Properties()
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "myflinkservice")
props.setProperty("auto.offset.reset", "earliest")
val incomingData = env.addSource(
new FlinkKafkaConsumer09[IncomingDataRecord](
"my.topic.name",
new IncomingDataSchema,
props
)
)