We are building a kafka-streams application as part of a large microservices architecture. We want to be resilient to backward incompatible format changes and have introduced a quarantined topic. We couldn't find anything provided by the library so we sort of rolled our own, by simply "manually" trying to deserialize a record and forward it to the quarantined topic upon failure.
Easy peasy.
Now comes the replay of the quarantined events. This should be triggered externally (say a REST call) and move the events to the next topic if deserialization succeeds.
Can we leverage kafka-streams to perform such a on-demand operation? Intuitively it should be as simple as builder.stream(quarantined).to(nextTopic)
.
Looking at the processor API it doesn't seem that it is possible to halt processing. Bluntly blocking isn't an option as that would affect the other tasks running in the same StreamThread
and having another KafkaStream app seems overkill.
I would like to avoid hand coding a consumer -> producer loop, so I'm also considering akka-stream kafka but that sounds a bit overkill too...
Any ideas?
DeserializationExceptionHandler
before but I didn't want to write my own producer. For this part I have written a simple utility that doesmap(tryDeserialize).filter(errors).to(quarantinedTopic)
. Or did I miss something? – Bruno Bieth