Referring to this solution, My spring-cloud-stream application.yml file has the following config:
#application.yml
spring.cloud.stream.bindings.input:
destination: my-topic-name
contentType: application/json
consumer:
useNativeDecoding: false
spring.cloud.stream.kafka.streams.bindings.input:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
materializedAs: my-store
Inside my main application with class annotated with @EnableBinding and inside the method annotated with @StreamListener, I am using kafka stream DSL and Processor API integration to access state stores which should be materialized upon application startup because of the presence of materializedAs in application.yml file.
ReadOnlyKeyValueStore<Object, String> store;
input.process(() -> new Processor<Object, Product>() {
@Override
public void init(ProcessorContext processorContext) {
store = (ReadOnlyKeyValueStore) processorContext.getStateStore("my-store");
}
@Override
public void process(Object key, Object value) {
//find the key
store.get(key);
}
@Override
public void close() {
if (state != null) {
state.close();
}
}
}, "my-store");
The problem is that upon the first time when application starts, the state stores have not been fully populated and are not ready yet (e.g empty state store) but the messages still arrives earlier and are being processed by the Kafka stream topology giving unexpected results.
How can we make sure that upon the first application startup, all or specific (user can defined if possible) state stores that have been instructed to be materialized by using materializedAs in application.yml file are fully populated and ready to use before any stream processing topology defined inside @StreamListener starts to process incoming messages. Can we force the stream processing of messages to wait untill the state stores are fully populated upon first time application startup?
I tried to replicate the issue by modifying one of the spring-cloud-stream sample and pushed the modified version here. More detail discussion on this can also be found here
max.task.idle.ms
(cf issues.apache.org/jira/browse/KAFKA-3514 and issues.apache.org/jira/browse/KAFKA-7458)? Default is zero. Maybe increasing the config helps? – Matthias J. Sax