I am using spring-cloud-stream with kafka binder to consume message from kafka . The application is basically consuming messages from kafka and updating a database.
There are scenarios when DB is down (which might last for hours) or some other temporary technical issues. Since in these scenarios there is no point in retrying a message for a limited amount of time and then move it to DLQ , i am trying to achieve infinite number of retries when we are getting certain type of exceptions (e.g. DBHostNotAvaialableException)
In order to achieve this i tried 2 approaches (facing issues in both the approaches) -
In this approach, Tried setting an errorhandler on container properties while configuring ConcurrentKafkaListenerContainerFactory bean but the error handler is not getting triggered at all. While debugging the flow i realized in the KafkaMessageListenerContainer that are created have the errorHandler field is null hence they use the default LoggingErrorHandler. Below are my container factory bean configurations - the @StreamListener method for this approach is the same as 2nd approach except for the seek on consumer.
@Bean public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(ConsumerFactory<String, Object> kafkaConsumerFactory) { ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(kafkaConsumerFactory); factory.getContainerProperties().setAckOnError(false); ContainerProperties containerProperties = factory.getContainerProperties(); // even tried a custom implementation of RemainingRecordsErrorHandler but call never went in to the implementation factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler()); return factory; }
Am i missing something while configuring factory bean or this bean is only relevant for @KafkaListener and not @StreamListener??
The second alternative was trying to achieve it using manual acknowledgement and seek, Inside a @StreamListener method getting Acknowledgment and Consumer from headers, in case a retryable exception is received, I do certain number of retries using retrytemplate and when those are exhausted I trigger a
consumer.seek()
. Example code below -@StreamListener(MySink.INPUT) public void processInput(Message<String> msg) { MessageHeaders msgHeaders = msg.getHeaders(); Acknowledgment ack = msgHeaders.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); Consumer<?,?> consumer = msgHeaders.get(KafkaHeaders.CONSUMER, Consumer.class); Integer partition = msgHeaders.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class); String topicName = msgHeaders.get(KafkaHeaders.RECEIVED_TOPIC, String.class); Long offset = msgHeaders.get(KafkaHeaders.OFFSET, Long.class); try { retryTemplate.execute( context -> { // this is a sample service call to update database which might throw retryable exceptions like DBHostNotAvaialableException consumeMessage(msg.getPayload()); return null; } ); } catch (DBHostNotAvaialableException ex) { // once retries as per retrytemplate are exhausted do a seek consumer.seek(new TopicPartition(topicName, partition), offset); } catch (Exception ex) { // if some other exception just log and put in dlq based on enableDlq property logger.warn("some other business exception hence putting in dlq "); throw ex; } if (ack != null) { ack.acknowledge(); }
}
Problem with this approach - since I am doing consumer.seek() while there might be pending records from last poll those might be processed and committed if DB comes up during that period(hence out of order). Is there a way to clear those records while a seek is performed?
PS - we are currently in 2.0.3.RELEASE version of spring boot and Finchley.RELEASE or spring cloud dependencies (hence cannot use features like negative acknowledgement either and upgrade is not possible at this moment).