0
votes

I am trying to write a Kafka consumer application in spring-kafka. I can think of 2 scenarios in which error can occur :

  • While processing records, an exception can occur in Service layer ( while updating records through API in a table)
  • Deserialization error

I had already explored an option to handle scenario 1, I can just throw an exception in my code and handle it using SeekToCurrentErrorHandler.

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setAckMode(AckMode.RECORD);
    factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(1000L, 2L)));
    return factory;
}

For scenario 2, I have got an option of ErrorHandlingDeserializer but I am not sure how to implement it with SeekToCurrentErrorHandler. Is there a way to include ErrorHandler for both scenarios using SeekToCurrentErrorHandler.

My property class is as below :

@Bean
public ConsumerFactory<String, String> consumerFactory(){
    
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_BROKERS);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
    props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
    props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SSL_PROTOCOL);
    props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,SSL_TRUSTSTORE_LOCATION_FILE_NAME);
    props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, SSL_TRUSTSTORE_SECURE);
    props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,SSL_KEYSTORE_LOCATION_FILE_NAME);
    props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, SSL_KEYSTORE_SECURE);
    props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, SSL_KEY_SECURE);
    
    return new DefaultKafkaConsumerFactory<>(props);
}

Publishing error records :

I am also thinking to publish error records to Dead Letter queue. For scenario 1, it should retry and publish on dead letter queue and for scenario 2, it should directly publish as there is no benefit of re-trying. I may not have access to create a topic on my own and would need to ask my producers to create one topic for error records as well. How can I implement logic to publish records on custom error topic. As I have no control over name if I use DeadLetterPublishingRecoverer. Based on my understanding, it creates topic with <original_topic_name>.DLT.

1

1 Answers

0
votes

The SeekToCurrentErrorHandler treats certain exceptions (such as DeserializationException) as fatal and are not retried - that failed record is immediately sent to the recoverer.

For retryable exceptions, the recoverer is called after retrieds are exhausted.

/**
 * Add exception types to the default list. By default, the following exceptions will
 * not be retried:
 * <ul>
 * <li>{@link DeserializationException}</li>
 * <li>{@link MessageConversionException}</li>
 * <li>{@link ConversionException}</li>
 * <li>{@link MethodArgumentResolutionException}</li>
 * <li>{@link NoSuchMethodException}</li>
 * <li>{@link ClassCastException}</li>
 * </ul>
 * All others will be retried.
 * @param exceptionTypes the exception types.
 * @see #removeNotRetryableException(Class)
 * @see #setClassifications(Map, boolean)
 */
public final void addNotRetryableExceptions(Class<? extends Exception>... exceptionTypes) {

Based on my understanding, it creates topic with <original_topic_name>.DLT.

That is the default behavior; you can provide your own DLT topic name strategy (destination resolver).

See the documentation.

The following example shows how to wire a custom destination resolver:

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
        (r, e) -> {
            if (e instanceof FooException) {
                return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
            }
            else {
                return new TopicPartition(r.topic() + ".other.failures", r.partition());
            }
        });
ErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 2L));