0
votes

We are building a kafka-streams application as part of a large microservices architecture. We want to be resilient to backward incompatible format changes and have introduced a quarantined topic. We couldn't find anything provided by the library so we sort of rolled our own, by simply "manually" trying to deserialize a record and forward it to the quarantined topic upon failure.

Easy peasy.

Now comes the replay of the quarantined events. This should be triggered externally (say a REST call) and move the events to the next topic if deserialization succeeds. Can we leverage kafka-streams to perform such a on-demand operation? Intuitively it should be as simple as builder.stream(quarantined).to(nextTopic).

Looking at the processor API it doesn't seem that it is possible to halt processing. Bluntly blocking isn't an option as that would affect the other tasks running in the same StreamThread and having another KafkaStream app seems overkill. I would like to avoid hand coding a consumer -> producer loop, so I'm also considering akka-stream kafka but that sounds a bit overkill too...

Any ideas?

1
Thanks @MatthiasJ.Sax, I've played with DeserializationExceptionHandler before but I didn't want to write my own producer. For this part I have written a simple utility that does map(tryDeserialize).filter(errors).to(quarantinedTopic). Or did I miss something?Bruno Bieth
That works, too :)Matthias J. Sax
For KS: if you do a blocking external call (ie, wait until the REST call returns an answer), you can still use Streams.Matthias J. Sax
I guess my question wasn't clear, the REST API I mentioned is there to trigger the replay of the quarantined events (i.e moving the quarantined events back to the pipeline). Say I have 3 topics (A, B, C): I stream from A and upon failure go to B otherwise to C. The rest call would trigger a one-off operation that would try from B and on success would go to C.Bruno Bieth

1 Answers

1
votes

If I understand your question correctly: whenever an external REST call is triggered, you want to start a separate streams app to read from the quarantined topic B, trying to deserialize the data with some updated format, and if succeed pushing it to the "good data" topic C, and this streams app should stop automatically when it reached the end of topic B.

In this case, and assuming you do not have a ordering requirement on the final topic C, you can use a "stop flag" internally which the KafkaStreams caller thread can block and wait on, while the KafkaStreams internal stream thread can set to unblock the caller threads to eventually call "KafkaStreams.close()". For example, you can leverage on a punctuation function which checks if there is no new data since the last punctuate period, indicating we have likely exhausted all the data from topic B, and in this case set the flag.

An example can be found in Streams own benchmarking code: https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java#L657-L673 but note it is not based on punctuation but based on processing logic, checking the current processed data content since it knows exactly what would the "last record" look like. But the general idea of using such a shutdown latch is the same.