2
votes

How do I make sure I always consume from the beginning of a Kafka topic with Flink?

With the Kafka 0.9.x consumer that is part of Flink 1.0.2, it appears that it's no longer Kafka but Flink to control the offset:

Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets committed to Kafka / ZooKeeper are only to bring the outside view of progress in sync with Flink's view of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer has consumed a topic.

This is how far I got, but my Flink program always starts where it left off, and doesn't return to the beginning as the configuration instructs it to:

val props = new Properties()
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "myflinkservice")
props.setProperty("auto.offset.reset", "earliest")

val incomingData = env.addSource(
  new FlinkKafkaConsumer09[IncomingDataRecord](
    "my.topic.name",
    new IncomingDataSchema,
    props
  )
)
2

2 Answers

3
votes

Use:

consumer.setStartFromEarliest();
0
votes

I think you can get around this by specifying a random group.id:

val props = new Properties()
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", s"myflinkservice_${UUID.randomUUID}")
props.setProperty("auto.offset.reset", "smallest") // "smallest", not "earliest"

auto.offset.reset only works when there's no initial offset available in ZooKeeper.