1
votes

I am using spring-cloud-stream with kafka binder to consume message from kafka . The application is basically consuming messages from kafka and updating a database.

There are scenarios when DB is down (which might last for hours) or some other temporary technical issues. Since in these scenarios there is no point in retrying a message for a limited amount of time and then move it to DLQ , i am trying to achieve infinite number of retries when we are getting certain type of exceptions (e.g. DBHostNotAvaialableException)

In order to achieve this i tried 2 approaches (facing issues in both the approaches) -

  1. In this approach, Tried setting an errorhandler on container properties while configuring ConcurrentKafkaListenerContainerFactory bean but the error handler is not getting triggered at all. While debugging the flow i realized in the KafkaMessageListenerContainer that are created have the errorHandler field is null hence they use the default LoggingErrorHandler. Below are my container factory bean configurations - the @StreamListener method for this approach is the same as 2nd approach except for the seek on consumer.

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> 
     kafkaListenerContainerFactory(ConsumerFactory<String, Object> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(kafkaConsumerFactory);
        factory.getContainerProperties().setAckOnError(false);
        ContainerProperties containerProperties = factory.getContainerProperties();
         // even tried a custom implementation of RemainingRecordsErrorHandler but call never went in to the implementation
        factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
        return factory;
    }
    

Am i missing something while configuring factory bean or this bean is only relevant for @KafkaListener and not @StreamListener??

  1. The second alternative was trying to achieve it using manual acknowledgement and seek, Inside a @StreamListener method getting Acknowledgment and Consumer from headers, in case a retryable exception is received, I do certain number of retries using retrytemplate and when those are exhausted I trigger a consumer.seek() . Example code below -

    @StreamListener(MySink.INPUT)
    public void processInput(Message<String> msg) {
    
    MessageHeaders msgHeaders = msg.getHeaders();
    Acknowledgment ack = msgHeaders.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
    Consumer<?,?> consumer = msgHeaders.get(KafkaHeaders.CONSUMER, Consumer.class);
    Integer partition = msgHeaders.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class);
    String topicName = msgHeaders.get(KafkaHeaders.RECEIVED_TOPIC, String.class);
    Long offset = msgHeaders.get(KafkaHeaders.OFFSET, Long.class);
    
    
    try {
      retryTemplate.execute(
                context -> {
                 // this is a sample service call to update database which might throw retryable exceptions like DBHostNotAvaialableException
                    consumeMessage(msg.getPayload());
                    return null;
                }
        );
    }
    catch (DBHostNotAvaialableException ex) {
      // once retries as per retrytemplate are  exhausted do a seek
    
        consumer.seek(new TopicPartition(topicName, partition), offset);
    
    }
    catch (Exception ex) {
      // if some other exception just log and put in dlq based on enableDlq property
        logger.warn("some other business exception hence putting in dlq ");
        throw ex;
    }
    
    if (ack != null) {
        ack.acknowledge();
    }
    

    }

Problem with this approach - since I am doing consumer.seek() while there might be pending records from last poll those might be processed and committed if DB comes up during that period(hence out of order). Is there a way to clear those records while a seek is performed?

PS - we are currently in 2.0.3.RELEASE version of spring boot and Finchley.RELEASE or spring cloud dependencies (hence cannot use features like negative acknowledgement either and upgrade is not possible at this moment).

1

1 Answers

0
votes

Spring Cloud Stream does not use a container factory. I already explained that to you in this answer.

Version 2.1 introduced the ListenerContainerCustomizer and if you add a bean of that type it will be called after the container is created.

Spring Boot 2.0 went end-of-life over a year ago and is no longer supported.

The answer I referred you shows how you can use reflection to add an error handler.

Doing the seek in the listener will only work if you have max.poll.records=1.