1
votes

I'm presently working on an use case in which user interaction with a platform is tracked, thus generating a stream of events that gets stored into kafka and will be subsequently processed in Kafka Streams/KSQL.

But I've run into an issue concerning the state store and changelog topic retention policies. User sessions could happen indefinitely apart in time, therefore I must guarantee that the state will be persisted through that period and restored in case of node and clusterwide failures. During our searches, we came accross the following information:


https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Internal+Data+Management

Kafka Streams allows for stateful stream processing, i.e. operators that have an internal state. (...). The default implementation used by Kafka Streams DSL is a fault-tolerant state store using 1. an internally created and compacted changelog topic (for fault-tolerance) and 2. one (or multiple) RocksDB instances (for cached key-value lookups). Thus, in case of starting/stopping applications and rewinding/reprocessing, this internal data needs to get managed correctly.

(...) Thus, RocksDB memory requirement does not grow infinitely (in contrast to changelog topic). (KAFKA-4015 was fixed in 0.10.1 release, and windowed changelog topics don't grow unbounded as they apply an additional retention time parameter).


Retention time in kafka local state store / changelog

"For windowed KTables there is a local retention time and there is the changlog retention time. You can set the local store retention time via Materialized.withRetentionTime(...) -- the default value is 24h.

If a new application is created, changelog topics are created with the same retention time as local store retention time."


https://docs.confluent.io/current/streams/developer-guide/config-streams.html

The windowstore.changelog.additional.retention.ms parameter states:

Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.


It would seem that Kafka Streams' maintains both a (replicated) local state store and a changelog topic for fault tolerance, with both having a finite, configurable retention period, and will apparently erase records once the retention time expires. This would lead to unnaceptable data loss in our platform, thus raising the following questions:

  1. Does Kafka Streams actually clean up the default state store over time or have I misunderstood something? Is there an actual risk of data loss?

  2. In that case, is it advisable or even possible to set an infinite retention policy to the state store? Or perhaps there could be another way of making sure the state will be persisted, such as using a more traditional database as state store, if that makes sense?

  3. Does the retention policy apply to standby replicas?

  4. If it's impossible to persist the state permanently, could there be another stream processing framework that better suits our use case?

Any clarification would be appreciated.

1

1 Answers

1
votes

Seems you're asking about two different things. Session windows and changelog topics...

Compacted topics retain unique key pairs forever. Session window duration should probably be closed over time; a user session a week/month/year from one today is arguably a new session, and you should tie together each individual session window as a collection by the userId, not only store the most recent session (which implies removing previous sessions from the state store)