1
votes

I'm going to use StateRestoreListener with Spring Cloud Kafka Streams binder. I need to monitor the restoration progress of fault-tolerant state stores of my applications. There is example in confluent https://docs.confluent.io/current/streams/monitoring.html#streams-monitoring-runtime-status .

In order to observe the restoration of all state stores you provide your application an instance of the org.apache.kafka.streams.processor.StateRestoreListener interface. You set the org.apache.kafka.streams.processor.StateRestoreListener by calling the KafkaStreams#setGlobalStateRestoreListener method.

The first problem is getting the Kafka Streams from the app. I solved this problem with using

StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();

The second problem is setting StateRestoreListener to KafkaStreams, because I get error

java.lang.IllegalStateException: Can only set GlobalStateRestoreListener in CREATED state. Current state is: RUNNING

Is it possible to use StateRestoreListener in Spring Cloud Kafka Streams binder? Thanks

1
You'll have to inject that property around the time where the properties are loaded into the Stream builderOneCricketeer
@cricket_007 Thanks. I thought about it, but I don't know how to catch time when Kafka Streams has CREATED state.Chernilin

1 Answers

4
votes

You can do that by using a StreamsBuilderFactoryBeanCustomizer that gives you access to the underlying KafkaStreams object. If you are using binder versions 3.0 or above, this is the recommended approach. For e.g., you can provide the following bean in your application and customize it with the GlobalStateRestoreListener.

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
    return factoryBean -> {
        factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
            @Override
            public void customize(KafkaStreams kafkaStreams) {
                kafkaStreams.setGlobalStateRestoreListener(...);
            }
        });
    };
}

This blog has more details on this strategy.