0
votes

We have 1 Kafka Topic and 1 partition:

Seeing a rather strange behavior from the spring boot kafka consumer. Spring kafka consumer always consumes from the start of the topic on restart. I have configured the spring kafka listener as follows

kafka listener:

@KafkaListener(topics = "${application.kafkaInputTopic}", groupId = "${spring.kafka.consumer.group-id}")
public void listen(String message) {
    log.debug("SG message received. Parsing...");
    TransmissionMessage transmissionMessage;
    SGTransmission transmission = parseMessage(message);
    //Porcess Transmission......
}

Consumer config and spring consumer container autowire beans

@Resource
public Environment env;

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
   // I know this isnt right, should be run in 1 thread as there isonly 
   //partition in the topic
    factory.setConcurrency(10);
    factory.getContainerProperties().setPollTimeout(3000);

    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);

    factory.getContainerProperties().setSyncCommits(true);
    return factory;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> propsMap = new HashMap();
    propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, env.getProperty(Constants.SPRING_KAFKA_SECURITY_PROTOCOL));
    propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty(Constants.SPRING_KAFKA_BOOTSTRAP_SERVERS));
    propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty(Constants.SPRING_KAFKA_GROUP_ID));
    return propsMap;
}

spring application yaml

kafka:
    bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
    properties:
    consumer:
        # If this consumer does not have an offset yet, start at latest offset.
        # Be careful with `earliest`, this will use the first (available) offset in the topic, which is most likely not what you want.
        auto-offset-reset: latest
        group-id: ${KAFKA_GROUP_ID}

Every time the consumer crashes and restarts all the messages are read from the start. which shouldnt be the case as you can see in the application.yaml

auto-offset-reset: latest

Could there be some other config on broker side or consumer side that I may have ignored, which is causing the consumer to read from the start every time it restarts??

1
can you show the logs that shows which properties are loaded ?Deadpool

1 Answers

2
votes

You must have committed the initial offset somehow, perhaps before you finalized this configuration.

factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);

That means you are responsible for committing the offsets.

Use AckMode.BATCH (the default) or AckMode.RECORD.

Or, delete the currently committed offset(s) with the kafka-consumer-groups CLI tool (you can use the same tool to list the current offsets too).

Or use a UUID for the group to get a new group each time.

EDIT

You can also have your listener class implement ConsumerSeekAware and call callback.seekToEnd(partitions) in onPartitionsAssigned().