0
votes

I am using the Kafka Streams API in a Java application (Spring Cloud Stream). I have a particular use case, as follows:

  • My application will consume from topic A, and produce and consume to/from topic B.
  • For each message on topic A there is a set of corresponding messages produced to topic B which the application uses to track internal state changes. It consumes from topic B with a KStream to materialize this state as a queryable store.

Since multiple instances of the application will be running and it cannot be guaranteed which particular partitions of either topic will be assigned to the instances, it is imperative that the state store is shared between the applications. Otherwise, if a rebalance were to occur for topic B then the application instances could lose the state information they are tracking for messages on topic A. Consider the following scenario:

  • Instance 1 has partition 1 for topic A and partition 1 for topic B.
  • A rebalance of partitions for topic B occurs.
  • Instance 1 now has partition 1 of topic A (unchanged) but has partition 2 of topic B.
  • Instance 1 has now lost access to the data in the state store it had created when it had partition 1 for topic B.

The same situation occurs if a rebalance happens only for topic A.

Is it possible to materialize into a "global state store"? I understand there is the concept of a GlobalKTable, but I need to use the KStream abstraction since I need access to the full stream of events. For reference, my KStream consumer is as follows:

    @StreamListener(INPUT_TOPIC)
    public void consumeKStream(KStream<String, Pojo> kStream) {
        kStream.groupByKey(Serialized.with(keySerde, valueSerde)).aggregate(HashMap::new, (key, value, map) -> {
            map.put(value.getFoo(), value.getBar()); return map;
        }, Materialized.<String, Map<Foo, Bar>, KeyValueStore<Bytes, byte[]>>as(STATE_STORE_NAME)
                .withKeySerde(keySerde).withValueSerde(valueMapSerde));
    }
1
Can you explain a bit further, how you want your application to work? I'm not sure, if I understand the exact problem. Can't you simply "route" the data through another topic using through?Jan Held
what do you mean "the KStream abstraction"? Spring doesnt have its own Ktable, AFAIKOneCricketeer

1 Answers

0
votes

If you read from topic A and from topic B, and you have topology that materializes data from topic B and does lookups into the store for topic A record, you will have the guarantee that an instance gets a co-partitioned assignment. Hence, the scenario you describe will never happen.

You can verify this, by inspecting you Topology (via describe()) that comprises sub-topologies. Sub-topologies are executed as tasks and tasks have a guaranteed co-partitioned input topic assignment.

Compare: https://docs.confluent.io/current/streams/architecture.html#parallelism-model