I am trying to wrap my head around Kafka Streams and having some fundamental questions that I can't seem to figure out on my own. I understand the concept of a KTable
and Kafka State Stores but am having trouble deciding how to approach this. I am also using Spring Cloud Streams, which adds another level of complexity on top of this.
My use case:
I have a rule engine that reads in a Kafka event, processes the event, returns a list of rules that matched and writes it into another topic. This is what I have so far:
@Bean
public Function<KStream<String, ProcessNode>, KStream<String, List<IndicatorEvaluation>>> process() {
return input -> input.mapValues(this::analyze).filter((host, evaluation) -> evaluation != null);
}
public List<IndicatorEvaluation> analyze(final String host, final ProcessNode process) {
// Does stuff
}
Some of the stateful rules look like:
[some condition] REPEATS 5 TIMES WITHIN 1 MINUTE
[some condition] FOLLOWEDBY [some condition] WITHIN 1 MINUTE
[rule A exists and rule B exists]
My current implementation is storing all this information in memory to be able to perform the analysis. For obvious reasons, it is not easily scalable. So I figured I would persist this into a Kafka State Store.
I am unsure of the best way to go about it. I know there is a way to create custom state stores that allow for a higher level of flexibility. I'm not sure if the Kafka DSL will support this.
Still new to Kafka Streams and wouldn't mind hearing a variety of suggestions.