We have 1 Kafka Topic and 1 partition:
Seeing a rather strange behavior from the spring boot kafka consumer. Spring kafka consumer always consumes from the start of the topic on restart. I have configured the spring kafka listener as follows
kafka listener:
@KafkaListener(topics = "${application.kafkaInputTopic}", groupId = "${spring.kafka.consumer.group-id}")
public void listen(String message) {
log.debug("SG message received. Parsing...");
TransmissionMessage transmissionMessage;
SGTransmission transmission = parseMessage(message);
//Porcess Transmission......
}
Consumer config and spring consumer container autowire beans
@Resource
public Environment env;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
// I know this isnt right, should be run in 1 thread as there isonly
//partition in the topic
factory.setConcurrency(10);
factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
factory.getContainerProperties().setSyncCommits(true);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap();
propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, env.getProperty(Constants.SPRING_KAFKA_SECURITY_PROTOCOL));
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty(Constants.SPRING_KAFKA_BOOTSTRAP_SERVERS));
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty(Constants.SPRING_KAFKA_GROUP_ID));
return propsMap;
}
spring application yaml
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
properties:
consumer:
# If this consumer does not have an offset yet, start at latest offset.
# Be careful with `earliest`, this will use the first (available) offset in the topic, which is most likely not what you want.
auto-offset-reset: latest
group-id: ${KAFKA_GROUP_ID}
Every time the consumer crashes and restarts all the messages are read from the start. which shouldnt be the case as you can see in the application.yaml
auto-offset-reset: latest
Could there be some other config on broker side or consumer side that I may have ignored, which is causing the consumer to read from the start every time it restarts??