1
votes

I have an external dependency on another system in my streams app and would like to publish a message to DLQ kafka topic from within my streams app whenever a Deserialization/Producer/or any external/network exception happens, so that I can monitor that topic and reprocess records as needed. I can't seem to find a good example of doing this anywhere. The closest reference I found is https://docs.confluent.io/current/streams/faq.html#option-3-quarantine-corrupted-records-dead-letter-queue, but 1. It talks only about DeserializationExceptionHandler, what about other exception scenarios? 2. It doesn't demo the right way to configure/manage/close the associated KafkaProducer.

I would like to have try catch for the external dependency code and send the record(s) that cause exception to a dead letter queue topic. Any help will be appreciated!

1

1 Answers

0
votes

For the processing logic you could take this approach:

someKStream 

    // the processing logic
    .mapValues(inputValue -> {
        // for each execution the below "return" could provide a different class than the previous run!
        // e.g. "return isFailedProcessing ? failValue : successValue;" 
        // where failValue and successValue have no related classes
        return someObject; // someObject class vary at runtime depending on your business
    }) // here you'll have KStream<whateverKeyClass, Object> -> yes, Object for the value!

    // you could have a different logic for choosing  
    // the target topic, below is just an example
    .to((k, v, recordContext) -> v instanceof failValueClass ?
            "dead-letter-topic" : "success-topic",
            // you could completelly ignore the "Produced" part 
            // and rely on spring-boot properties only, e.g. 
            // spring.kafka.streams.properties.default.key.serde=yourKeySerde
            // spring.kafka.streams.properties.default.value.serde=org.springframework.kafka.support.serializer.JsonSerde
            Produced.with(yourKeySerde, 
                            // JsonSerde could be an instance configured as you need 
                            // (with type mappings or headers setting disabled, etc)
                            new JsonSerde<>())); 

Your classes, though different and landing into different topics, will serialize as expected.

When not using to(), but instead one wants to continue with other processing, he could use branch() with splitting the logic based on the kafka-value class; the trick for branch() is to return KStream<keyClass, ?>[] in order to further allow one to cast to the appropriate class the individual items from KStream<keyClass, ?>[].