
We have 1 Kafka Topic and 1 partition:

Seeing a rather strange behavior from the spring boot kafka consumer. Spring kafka consumer always consumes from the start of the topic on restart. I have configured the spring kafka listener as follows

kafka listener:

@KafkaListener(topics = "${application.kafkaInputTopic}", groupId = "${spring.kafka.consumer.group-id}")
public void listen(String message) {
    log.debug("SG message received. Parsing...");
    TransmissionMessage transmissionMessage;
    SGTransmission transmission = parseMessage(message);
    //Porcess Transmission......

Consumer config and spring consumer container autowire beans

public Environment env;

KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
   // I know this isnt right, should be run in 1 thread as there isonly 
   //partition in the topic


    return factory;

public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());

public Map<String, Object> consumerConfigs() {
    Map<String, Object> propsMap = new HashMap();
    propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, env.getProperty(Constants.SPRING_KAFKA_SECURITY_PROTOCOL));
    propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty(Constants.SPRING_KAFKA_BOOTSTRAP_SERVERS));
    propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty(Constants.SPRING_KAFKA_GROUP_ID));
    return propsMap;

spring application yaml

    bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
        # If this consumer does not have an offset yet, start at latest offset.
        # Be careful with `earliest`, this will use the first (available) offset in the topic, which is most likely not what you want.
        auto-offset-reset: latest
        group-id: ${KAFKA_GROUP_ID}

Every time the consumer crashes and restarts all the messages are read from the start. which shouldnt be the case as you can see in the application.yaml

auto-offset-reset: latest

Could there be some other config on broker side or consumer side that I may have ignored, which is causing the consumer to read from the start every time it restarts??

You must have committed the initial offset somehow, perhaps before you finalized this configuration.


That means you are responsible for committing the offsets.

Use AckMode.BATCH (the default) or AckMode.RECORD.

Or, delete the currently committed offset(s) with the kafka-consumer-groups CLI tool (you can use the same tool to list the current offsets too).

Or use a UUID for the group to get a new group each time.


You can also have your listener class implement ConsumerSeekAware and call callback.seekToEnd(partitions) in onPartitionsAssigned().