I am trying to write a kafka consumer application in java on Springboot platform. Earlier, I have written code in plain java but now converting into spring-kafka as it can give some advantage over plain java. I do have few questions that I am trying to understand.
It seems that I don't have to explicitly poll() loop in spring-kafka and it would be handled automatically by @KafkaListener?
I have set enable.auto.commit='false', As I have to do some processing before committing offsets, how can I perform commitAsync() in Spring-Kafka?
ConsumerConfig.java :
@EnableKafka @Configuration public class KafkaConsumerConfig { @Value("${app.kafka_brokers}") private String KAFKA_BROKERS; @Value("${app.topic}") private String KAFKA_TOPIC; @Value("${app.group_id_config}") private String GROUP_ID_CONFIG; @Value("${app.schema_registry_url}") private String SCHEMA_REGISTRY_URL; @Value("${app.offset_reset}") private String OFFSET_RESET; @Value("${app.max_poll_records}") private String MAX_POLL_RECORDS; @Value("${app.security.protocol}") private String SSL_PROTOCOL; @Value("${app.ssl.truststore.password}") private String SSL_TRUSTSTORE_SECURE; @Value("${app.ssl.keystore.password}") private String SSL_KEYSTORE_SECURE; @Value("${app.ssl.key.password}") private String SSL_KEY_SECURE; @Value("${app.ssl.truststore.location}") private String SSL_TRUSTSTORE_LOCATION_FILE_NAME; @Value("${app.ssl.keystore.location}") private String SSL_KEYSTORE_LOCATION_FILE_NAME; @Bean public ConsumerFactory<String, String> consumerFactory(){ Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_BROKERS); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG); props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SSL_PROTOCOL); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,SSL_TRUSTSTORE_LOCATION_FILE_NAME); props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, SSL_TRUSTSTORE_SECURE); props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,SSL_KEYSTORE_LOCATION_FILE_NAME); props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, SSL_KEYSTORE_SECURE); props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, SSL_KEY_SECURE); return new DefaultKafkaConsumerFactory<>(props); } @Bean ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); return factory; } }
KafkaConsumer.java :
@Component
public class KafkaConsumer {
@KafkaListener(topics = "topic", groupId = "group")
public void run(ConsumerRecord<String, GenericRecord> record) {
System.out.println(record);
<-- how to asyncCommit()-->
}
}