1
votes

I am trying to wrap my head around Kafka Streams and having some fundamental questions that I can't seem to figure out on my own. I understand the concept of a KTable and Kafka State Stores but am having trouble deciding how to approach this. I am also using Spring Cloud Streams, which adds another level of complexity on top of this.

My use case:

I have a rule engine that reads in a Kafka event, processes the event, returns a list of rules that matched and writes it into another topic. This is what I have so far:

@Bean
public Function<KStream<String, ProcessNode>, KStream<String, List<IndicatorEvaluation>>> process() {
    return input -> input.mapValues(this::analyze).filter((host, evaluation) -> evaluation != null);
}

public List<IndicatorEvaluation> analyze(final String host, final ProcessNode process) {
    // Does stuff
}

Some of the stateful rules look like:

[some condition] REPEATS 5 TIMES WITHIN 1 MINUTE
[some condition] FOLLOWEDBY [some condition] WITHIN 1 MINUTE
[rule A exists and rule B exists]

My current implementation is storing all this information in memory to be able to perform the analysis. For obvious reasons, it is not easily scalable. So I figured I would persist this into a Kafka State Store.

I am unsure of the best way to go about it. I know there is a way to create custom state stores that allow for a higher level of flexibility. I'm not sure if the Kafka DSL will support this.

Still new to Kafka Streams and wouldn't mind hearing a variety of suggestions.

1

1 Answers

2
votes

From the description you have given, I believe this use case can still be implemented using the DSL in Kafka Streams. The code you have shown above does not track any state. In your topology, you need to add state by tracking the counts of the rules and store them in a state store. Then you only need to send the output rules when that count hits a threshold. Here is the general idea behind this as a pseudo-code. Obviously, you have to tweak this to satisfy the particular specifications of your use case.

@Bean
public Function<KStream<String, ProcessNode>, KStream<String, List<IndicatorEvaluation>>> process() {
    return input -> input
                     .mapValues(this::analyze)
                     .filter((host, evaluation) -> evaluation != null)
                     ...
                     .groupByKey(...)
                     .windowedBy(TimeWindows.of(Duration.ofHours(1)))
                     .count(Materialized.as("rules"))
                     .filter((key, value) -> value > 4)
                     .toStream()
                    ....
}