0
votes

I am trying to read Kafka topic from last offset, but I am not able to do it correctly:

In properties file I set these:

group.id=my_group
client.id=my_client
enable.auto.commit=true
auto.offset.reset=earliest
isolation.level=read_committed

Then the code:

consumer = new KafkaConsumer<>(props); // read properties
consumer.subscribe(Collections.singletonList(topic));
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1L));
consumerRecords.forEach(consumerRecord -> System.out.println(consumerRecord.offset());

I can see how offset is printed from 0 even if I have some items in topic.

In log file I can see this (shortened):

[Consumer clientId=my_client, groupId=my_group] Finished assignment for group at generation 1: {my_client-f1678be7-ce6b-48e8-acf2-741ab28f7266=Assignment(partitions=[mytopic-0])}

[Consumer clientId=my_client, groupId=my_group] Successfully joined group with generation 1 [Consumer clientId=my_client, groupId=my_group] Notifying assignor about the new Assignment(partitions=[mytopic-0])

[Consumer clientId=my_client, groupId=my_group] Adding newly assigned partitions: mytopic-0

[Consumer clientId=my_client, groupId=my_group] Found no committed offset for partition mytopic-0

[Consumer clientId=my_client,> groupId=my_group] Resetting offset for partition mytopic-0 to offset 0.

Any idea what am I doing wrong? I tried some magic with seekToBeginning() + poll() + commitSync() + seekToEnd() and it somehow worked, but I think this should work by default.

1
I think I can see the problem now: could it be the auto.offset.reset flag set to earliest? I've followed stackoverflow.com/a/53562232/3467894 where "will start the consumer at the last committed offset" is mentioned.Michal Špondr

1 Answers