9
votes

I have a use case like the following. For each incoming event, I want to look at a certain field to see if it's status changed from A to B and if so, send that to an output topic. The flow is like this: An event with key "xyz" comes in with status A, and some time later another event comes in with key "xyz" with status B. I have this code using the high level DSL.

final KStream<String, DomainEvent> inputStream....

final KStream<String, DomainEvent> outputStream = inputStream
          .map((k, v) -> new KeyValue<>(v.getId(), v))
                    .groupByKey(Serialized.with(Serdes.String(), jsonSerde))
                    .aggregate(DomainStatusMonitor::new,
                            (k, v, aggregate) -> {
                                aggregate.updateStatusMonitor(v);
                                return aggregate;
                            }, Materialized.with(Serdes.String(), jsonSerde))
                    .toStream()
                    .filter((k, v) -> v.isStatusChangedFromAtoB())
                    .map((k,v) -> new KeyValue<>(k, v.getDomainEvent()));

Is there a better way to write this logic using the DSL?

Couple of questions regarding the state store created by the aggregation in the code above.

  1. Is it creating an in-memory state store by default?
  2. What will happen if I have an unbounded number of unique incoming keys? If it is using an in-memory store by default, don't I need to switch to a persistent store? How do we handle situations like that in the DSL?
  3. If the state store is very large (either in-memory or persistent), how does it affect the startup time? How can I make the stream processing to wait so that the store gets fully initialized? Or will Kafka Streams ensure that the state store is fully initialized before any incoming events are processed?

Thanks in advance!

1

1 Answers

18
votes
  1. By default, a persistent RocksDB store will be used. If you want to use an in-memory store, you would pass in Materialized.as(Stores.inMemoryKeyValueStore(...))

  2. If you have an infinite number of unique keys, you will eventually run out of main-memory or disk and your application will die. Depending on your semantics, you can get a "TTL" by using a session windowed aggregation with a large "gap" parameter instead to expire old keys.

  3. The state will always be restored before processing new data happens. If you use in-memory store, this will happen by consuming the underlying changelog topic. Depending on the size of your state, this can take a while. If you use persistent RocksDB store, the state will be loaded from disk and thus no restore will be required and processing should happen immediately. Only if you loose the state on local disk, a restore from the changelog topic will happen for this case.