I have written a very simple Flink streaming job which takes data from Kafka using a FlinkKafkaConsumer082
.
protected DataStream<String> getKafkaStream(StreamExecutionEnvironment env, String topic) {
Properties result = new Properties();
result.put("bootstrap.servers", getBrokerUrl());
result.put("zookeeper.connect", getZookeeperUrl());
result.put("group.id", getGroup());
return env.addSource(
new FlinkKafkaConsumer082<>(
topic,
new SimpleStringSchema(), result);
}
This works very well and whenever I put something into the topic on Kafka, it is received by my Flink job and processed. Now I tried to see what happens if my Flink Job isn't online for some reason. So I shut down the flink job and kept sending messages to Kafka. Then I started my Flink job again and was expecting that it would process the messages that were sent meanwhile.
However, I got this message:
No prior offsets found for some partitions in topic collector.Customer. Fetched the following start offsets [FetchPartition {partition=0, offset=25}]
So it basically ignored all messages that came since the last shutdown of the Flink job and just started to read at the end of the queue. From the documentation of FlinkKafkaConsumer082
I gathered, that it automatically takes care of synchronizing the processed offsets with the Kafka broker. However that doesn't seem to be the case.
I am using a single-node Kafka installation (the one that comes with the Kafka distribution) with a single-node Zookeper installation (also the one that is bundled with the Kafka distribution).
I suspect it is some kind of misconfiguration or something the like but I really don't know where to start looking. Has anyone else had this issue and maybe solved it?