I am using spring Kafka to consume message produced by LinkedIn large message supported Kafka client
Given that this Kafka client always overrides AUTO_OFFSET_RESET_CONFIG
to none as shown in its constructor.
private LiKafkaConsumerImpl(LiKafkaConsumerConfig configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
Deserializer<LargeMessageSegment> largeMessageSegmentDeserializer,
Auditor<K, V> consumerAuditor) {
_kafkaConsumer = new KafkaConsumer<>(configs.configForVanillaConsumer(),
byteArrayDeserializer,
byteArrayDeserializer);
}
Map<String, Object> configForVanillaConsumer() {
Map<String, Object> newConfigs = new HashMap<>();
newConfigs.putAll(this.originals());
newConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
newConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
return newConfigs;
}
So Once I start using batch commit and setting the ENABLE_AUTO_COMMIT_CONFIG
to false, it throws the following error:
[org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] ERROR o.a.k.c.c.i.ConsumerCoordinator - User provided listener com.linkedin.kafka.clients.consumer.LiKafkaConsumerRebalanceListener for group document-event-consumer failed on partition assignment org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partition: DocumentEvents-2 at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:369) at org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:247) at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1602) at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1265) at com.linkedin.kafka.clients.consumer.LiKafkaConsumerImpl.position(LiKafkaConsumerImpl.java:403) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$1.onPartitionsAssigned(KafkaMessageListenerContainer.java:447) at com.linkedin.kafka.clients.consumer.LiKafkaConsumerRebalanceListener.onPartitionsAssigned(LiKafkaConsumerRebalanceListener.java:62) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at com.linkedin.kafka.clients.consumer.LiKafkaConsumerImpl.poll(LiKafkaConsumerImpl.java:231) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:558) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:745)
This issue happens because it is the first time for this consumer group to consume messages from this topic, so it tries to use the offset reset policy.
Although I set it to "earliest", it got overridden to "none" by the underlying LinkedIn kafka client
I tried also to override ConsumerRebalanceListener to manually seek to the beginning in this case, but actually it does not come to this point.
How can I solve this issue?