0
votes

I have 5 different machine with each scaled 5 spring boot instance that uses kafka-streams application. I am using 50 partitions compacted topic with different 2-3 topics and each my instance has 10 concurrency. I am using docker swarm and docker volume. Using these topics KTable or KStream do some flatMap, map and join operations with my kafka streams app.

    props.put(StreamsConfig.STATE_DIR_CONFIG, /tmp/kafka-streams);
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
    props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
    props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
    props.put("num.stream.threads", 10);
    props.put("application.id", applicationId);

If everything goes OK nothing is wrong or no data loss in my application with .join() operations, but when one of my instance is down my join operations are not able to do the join actually.

My question is: When the app is restarted or redeployed (and given that it's working inside a non-persistent container) its state is gone right? Than my join operations don't work. It is When I redeploy my instance and populate my compacted topic from elasticsearch with the latest entities my join operations are OK. So I think when my application starts at new machine my local state-store is gone ? But the kafka document says:

If tasks run on a machine that fails and are restarted on another machine, Kafka Streams guarantees to restore their associated state stores to the content before the failure by replaying the corresponding changelog topics prior to resuming the processing on the newly started tasks. As a result, failure handling is completely transparent to the end user. Note that the cost of task (re)initialization typically depends primarily on the time for restoring the state by replaying the state stores' associated changelog topics. To minimize this restoration time, users can configure their applications to have standby replicas of local states (i.e. fully replicated copies of the state). When a task migration happens, Kafka Streams then attempts to assign a task to an application instance where such a standby replica already exists in order to minimize the task (re)initialization cost. See num.standby.replicas at the Kafka Streams Configs Section. (https://kafka.apache.org/0102/documentation/streams/architecture)

Does my downed instance refresh kafka state-store when it goes up ? If it is why I am losing data and I have no idea :/ Or can't reload state-store because of commit_offset because all my instance's use same applicationId ?

Thanks !

1

1 Answers

0
votes

The changelog topics are always read from the earliest offset, and they're compacted, so they don't lose data.

If you're joining non compact topics, then sure, you lose data, but that's not limited to Kafka Streams or your specific use case... You'll need to configure the topic to retain data for at least as long as you think it'll take you to solve any issues with topic downtime. While the data is retained, you can always seek your consumer to it

If you want persistent storage, use a volume mount to your container via Kubernetes, for example, or plug in a state state store stored externally to the container like Redis : https://github.com/andreas-schroeder/redisks