0
votes

Referring to this solution, My spring-cloud-stream application.yml file has the following config:

#application.yml

spring.cloud.stream.bindings.input:
  destination: my-topic-name
  contentType: application/json
  consumer:
    useNativeDecoding: false
spring.cloud.stream.kafka.streams.bindings.input:
  consumer:
    keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
    valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
    materializedAs: my-store

Inside my main application with class annotated with @EnableBinding and inside the method annotated with @StreamListener, I am using kafka stream DSL and Processor API integration to access state stores which should be materialized upon application startup because of the presence of materializedAs in application.yml file.

ReadOnlyKeyValueStore<Object, String> store;

 input.process(() -> new Processor<Object, Product>() {

                @Override
                public void init(ProcessorContext processorContext) {
                    store = (ReadOnlyKeyValueStore) processorContext.getStateStore("my-store");


                }

                @Override
                public void process(Object key, Object value) {
                    //find the key
                    store.get(key);
                }

                @Override
                public void close() {
                    if (state != null) {
                        state.close();
                    }
                }
            }, "my-store");

The problem is that upon the first time when application starts, the state stores have not been fully populated and are not ready yet (e.g empty state store) but the messages still arrives earlier and are being processed by the Kafka stream topology giving unexpected results.

How can we make sure that upon the first application startup, all or specific (user can defined if possible) state stores that have been instructed to be materialized by using materializedAs in application.yml file are fully populated and ready to use before any stream processing topology defined inside @StreamListener starts to process incoming messages. Can we force the stream processing of messages to wait untill the state stores are fully populated upon first time application startup?

I tried to replicate the issue by modifying one of the spring-cloud-stream sample and pushed the modified version here. More detail discussion on this can also be found here

1
Not sure if I understand. It seems you have one input topic and one store. Where should the store data come from that you want to pre-load?Matthias J. Sax
There are many input topics from which I generate KStreams and KTables and also materialize stateStore. The KStreams and KTables are supposed to be enriched/filtered using data available in state store materialized from other topics. Example in the above, the stateStore is materialized from topic 'my-topic-name' whose content will be used to process messages in KStreams/KTables generated from another topic.Muhammad Arslan Akhtar
And you populate it in the same application? Otherwise it won't work. It this two application, it two store with the same name but there is no connection between both. If it's in the same application, make sure that the date from the table topic has smaller timestamps than the record in the KStream topic -- data with smaller timestamp is processed first, and thus it should populate your state.Matthias J. Sax
I have created a simple spring-cloud-stream app here to replicate the issue. The result of the transformer class is outputted to the console. I outputted the results from within the UserTransformer.java class and persisted the outputs in the following file. Inside result.txt file, the 1st app run has empty stateStore and after stopping and restarting the app again with new messages in the topics, the stateStore is fully populated.Muhammad Arslan Akhtar
Maybe related to configuration parameter max.task.idle.ms (cf issues.apache.org/jira/browse/KAFKA-3514 and issues.apache.org/jira/browse/KAFKA-7458)? Default is zero. Maybe increasing the config helps?Matthias J. Sax

1 Answers

0
votes

It seems that you need join the stream with GlobalKTable(state store), instead of using process interface. Here is the Kstream join GlobalKTable. Please give a try.