3
votes

I am trying to write a kafka consumer application in java on Springboot platform. Earlier, I have written code in plain java but now converting into spring-kafka as it can give some advantage over plain java. I do have few questions that I am trying to understand.

  • It seems that I don't have to explicitly poll() loop in spring-kafka and it would be handled automatically by @KafkaListener?

  • I have set enable.auto.commit='false', As I have to do some processing before committing offsets, how can I perform commitAsync() in Spring-Kafka?

    ConsumerConfig.java :

    @EnableKafka
    @Configuration
    public class KafkaConsumerConfig {
    
        @Value("${app.kafka_brokers}")
        private String KAFKA_BROKERS;
    
        @Value("${app.topic}")
        private String KAFKA_TOPIC;
    
        @Value("${app.group_id_config}")
        private String GROUP_ID_CONFIG;
    
        @Value("${app.schema_registry_url}")
        private String SCHEMA_REGISTRY_URL;
    
        @Value("${app.offset_reset}")
        private String OFFSET_RESET;
    
        @Value("${app.max_poll_records}")
        private String MAX_POLL_RECORDS;
    
        @Value("${app.security.protocol}")
        private String SSL_PROTOCOL;
    
        @Value("${app.ssl.truststore.password}")
        private String SSL_TRUSTSTORE_SECURE;
    
        @Value("${app.ssl.keystore.password}")
        private String SSL_KEYSTORE_SECURE;
    
        @Value("${app.ssl.key.password}")
        private String SSL_KEY_SECURE;
    
        @Value("${app.ssl.truststore.location}")
        private String SSL_TRUSTSTORE_LOCATION_FILE_NAME;
    
        @Value("${app.ssl.keystore.location}")
        private String SSL_KEYSTORE_LOCATION_FILE_NAME;
    
        @Bean
        public ConsumerFactory<String, String> consumerFactory(){
    
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_BROKERS);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
            props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
            props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SSL_PROTOCOL);
            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,SSL_TRUSTSTORE_LOCATION_FILE_NAME);
            props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, SSL_TRUSTSTORE_SECURE);
            props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,SSL_KEYSTORE_LOCATION_FILE_NAME);
            props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, SSL_KEYSTORE_SECURE);
            props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, SSL_KEY_SECURE);
    
            return new DefaultKafkaConsumerFactory<>(props);
    
        }
    
        @Bean
        ConcurrentKafkaListenerContainerFactory<String, String> 
        kafkaListenerContainerFactory() {
    
          ConcurrentKafkaListenerContainerFactory<String, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
          factory.setConsumerFactory(consumerFactory());
          factory.setConcurrency(3);
          return factory;
      }
    
    }
    

KafkaConsumer.java :

@Component
public class KafkaConsumer {
    
    @KafkaListener(topics = "topic", groupId = "group")
    public void run(ConsumerRecord<String, GenericRecord> record) {
        
        System.out.println(record);
        
    <-- how to asyncCommit()--> 
    }

}
2

2 Answers

2
votes

First of all, I suggest you use the properties and AutoConfiguration set by Spring kafka instead of creating your own as it follows the DRY Principle: Don't Repeat Yourself.

spring:
  kafka:
    bootstrap-servers: ${app.kafka_brokers}
    consumer:
      auto-offset-reset: ${app.offset_reset}
      enable-auto-commit: false   // <---- disable auto committing
    ssl:
      protocol: ${app.security.protocol}
      key-store-location: ${app.ssl.keystore.location}
      key-store-password:  ${app.ssl.keystore.password}
      trust-store-location: ${app.ssl.truststore.location}
      trust-store-password: ${app.ssl.truststore.password}
  // And other properties
    listener:
      ack-mode: manual // This is what you need

The AckMode docs: https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ContainerProperties.AckMode.html

Essentially, manual is an asynchronous acknowledgment, while manual_immediate is synchronous.

Then inside your @KafkaListener component you can inject org.springframework.kafka.support.Acknowledgment object acknowledge your message.

@Component
public class KafkaConsumer {
    
    @KafkaListener(topics = "topic", groupId = "group")
    public void run(ConsumerRecord<String, GenericRecord> record, Acknowledgment acknowledgment) {
        
        System.out.println(record);
        
        acknowledgment.acknowledge();
    }

}

Here's the documentation for what can be injected into a @KafkaListener method: https://docs.spring.io/spring-kafka/reference/html/#message-listeners

1
votes

The listener container will commit the offset when the listener exits normally, depending on the container's AckMode property; AckMode.BATCH (default) means the offsets for all the records returned by the poll will be committed after they have all been processed, AckMode.RECORD means each offset will be committed as soon as the listener exits.

sync Vs. async is controlled by the syncCommits container property.