0
votes

I have a Kafka Topic (1.0.0) with a single partition. The Consumer is packed inside an EAR and when deployed to Wildfly 10, the poll of the last message always returns 0 messages. Although the topic is not empty.

final TopicPartition tp = new TopicPartition(topic, 0);

final Long beginningOffset = consumer.beginningOffsets(Collections.singleton(tp)).get(tp);
final Long endOffset = consumer.endOffsets(Collections.singleton(tp)).get(tp);

consumer.assign(Collections.singleton(tp));
consumer.seek(tp, endOffset - 1); 

When I do a poll I get 0 records. Although the logging states:

Consumer is now at position 377408 while Topic begin is 0 and end is 377409

When I change to -2 like:

consumer.seek(tp, endOffset - 2);

I DO get one message:

 Consumer is now at position 377407 while Topic begin is 0 and end is 377409

But of course this is not the proper record, WHERE is message 377408 ?

Tried many ways to seek to end etc, but it never works.

Here is my Consumer config:

Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Configuration.KAFKA_SERVERS.getAsString());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

Note: I tried with read_uncommitted AND read_committed, both give the same result.

1

1 Answers

0
votes

As mentioned in the Javadoc, this is because endOffsets() returns:

the offset of the last successfully replicated message plus one

This is effectively the offset the next message will get.

This is why seeking to endOffset - 1 does not return anything while seeking to endOffset - 2 only returns the last message.

I agree this may not be the most intuitive behaviour but this is how it currently works!