I have a use case like the following. For each incoming event, I want to look at a certain field to see if it's status changed from A to B and if so, send that to an output topic. The flow is like this: An event with key "xyz" comes in with status A, and some time later another event comes in with key "xyz" with status B. I have this code using the high level DSL.
final KStream<String, DomainEvent> inputStream....
final KStream<String, DomainEvent> outputStream = inputStream
.map((k, v) -> new KeyValue<>(v.getId(), v))
.groupByKey(Serialized.with(Serdes.String(), jsonSerde))
.aggregate(DomainStatusMonitor::new,
(k, v, aggregate) -> {
aggregate.updateStatusMonitor(v);
return aggregate;
}, Materialized.with(Serdes.String(), jsonSerde))
.toStream()
.filter((k, v) -> v.isStatusChangedFromAtoB())
.map((k,v) -> new KeyValue<>(k, v.getDomainEvent()));
Is there a better way to write this logic using the DSL?
Couple of questions regarding the state store created by the aggregation in the code above.
- Is it creating an in-memory state store by default?
- What will happen if I have an unbounded number of unique incoming keys? If it is using an in-memory store by default, don't I need to switch to a persistent store? How do we handle situations like that in the DSL?
- If the state store is very large (either in-memory or persistent), how does it affect the startup time? How can I make the stream processing to wait so that the store gets fully initialized? Or will Kafka Streams ensure that the state store is fully initialized before any incoming events are processed?
Thanks in advance!