I am trying to write a Kafka consumer application in spring-kafka. I can think of 2 scenarios in which error can occur :
- While processing records, an exception can occur in Service layer ( while updating records through API in a table)
- Deserialization error
I had already explored an option to handle scenario 1, I can just throw an exception in my code and handle it using SeekToCurrentErrorHandler
.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(1000L, 2L)));
return factory;
}
For scenario 2, I have got an option of ErrorHandlingDeserializer
but I am not sure how to implement it with SeekToCurrentErrorHandler
. Is there a way to include ErrorHandler for both scenarios using SeekToCurrentErrorHandler.
My property class is as below :
@Bean
public ConsumerFactory<String, String> consumerFactory(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_BROKERS);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SSL_PROTOCOL);
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,SSL_TRUSTSTORE_LOCATION_FILE_NAME);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, SSL_TRUSTSTORE_SECURE);
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,SSL_KEYSTORE_LOCATION_FILE_NAME);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, SSL_KEYSTORE_SECURE);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, SSL_KEY_SECURE);
return new DefaultKafkaConsumerFactory<>(props);
}
Publishing error records :
I am also thinking to publish error records to Dead Letter queue. For scenario 1, it should retry and publish on dead letter queue and for scenario 2, it should directly publish as there is no benefit of re-trying. I may not have access to create a topic on my own and would need to ask my producers to create one topic for error records as well. How can I implement logic to publish records on custom error topic.
As I have no control over name if I use DeadLetterPublishingRecoverer
. Based on my understanding, it creates topic with <original_topic_name>.DLT.