2
votes

I have written a very simple Flink streaming job which takes data from Kafka using a FlinkKafkaConsumer082.

protected DataStream<String> getKafkaStream(StreamExecutionEnvironment env, String topic) {
    Properties result = new Properties();
    result.put("bootstrap.servers", getBrokerUrl());
    result.put("zookeeper.connect", getZookeeperUrl());
    result.put("group.id", getGroup());

        return env.addSource(
                new FlinkKafkaConsumer082<>(
                        topic,
                        new SimpleStringSchema(), result);
}

This works very well and whenever I put something into the topic on Kafka, it is received by my Flink job and processed. Now I tried to see what happens if my Flink Job isn't online for some reason. So I shut down the flink job and kept sending messages to Kafka. Then I started my Flink job again and was expecting that it would process the messages that were sent meanwhile.

However, I got this message:

No prior offsets found for some partitions in topic collector.Customer. Fetched the following start offsets [FetchPartition {partition=0, offset=25}]

So it basically ignored all messages that came since the last shutdown of the Flink job and just started to read at the end of the queue. From the documentation of FlinkKafkaConsumer082 I gathered, that it automatically takes care of synchronizing the processed offsets with the Kafka broker. However that doesn't seem to be the case.

I am using a single-node Kafka installation (the one that comes with the Kafka distribution) with a single-node Zookeper installation (also the one that is bundled with the Kafka distribution).

I suspect it is some kind of misconfiguration or something the like but I really don't know where to start looking. Has anyone else had this issue and maybe solved it?

2

2 Answers

4
votes

I found the reason. You need to explicitly enable checkpointing in the StreamExecutionEnvironment to make the Kafka connector write the processed offsets to Zookeeper. If you don't enable it, the Kafka connector will not write the last read offset and it will therefore not be able to resume from there when the collecting Job is restarted. So be sure to write:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(); // <-- this is the important part

Anatoly's suggestion for changing the initial offset is probably still a good idea, in case checkpointing fails for some reason.

2
votes

https://kafka.apache.org/08/configuration.html

set auto.offset.reset to smallest(by default it's largest)

auto.offset.reset:

What to do when there is no initial offset in Zookeeper or if an offset is out of range:

smallest : automatically reset the offset to the smallest offset

largest : automatically reset the offset to the largest offset

anything else: throw exception to the consumer.

If this is set to largest, the consumer may lose some messages when the number of partitions, for the topics it subscribes to, changes on the broker. To prevent data loss during partition addition, set auto.offset.reset to smallest

Also make sure getGroup() is the same after restart