
We are using Kafka-streams included within the spring cloud stream Hoxton RC7 project (and therefore use the Kafka-streams and Kafka-client versions provided [2.3.1])

ext {
    set('springCloudVersion', 'Hoxton.SR7')

dependencies {
    // spring cloud stream
    implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
    implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
    // redis 
    implementation 'io.lettuce:lettuce-core'
    implementation 'org.springframework.data:spring-data-redis'
    testCompile 'it.ozimov:embedded-redis:0.7.2'

We have implemented a kstreams application

public Consumer<KStream<String, IncomingEvent>> process() {

    return input -> {

Where we do some aggregation within like:

.aggregate(Foo::new, (key, value1, aggregate) ->
                (aggregate == null || aggregate.getLastModified() == null || this.mustProcess(key, value1))
                        ? value1
                        : aggregate,


Now materialized should be a custom external state store (Redis):

Materialized<String, Foo, KeyValueStore<Bytes, byte[]>> materialized =

Which is provided by a StoreBuilder Bean:

public StoreBuilder<KeyValueStore<String, Foo>> builder(RedisKeyValueStoreBytes redisKeyValueStoreBytes){
    return Stores.keyValueStoreBuilder(supplier(redisKeyValueStoreBytes),
            new Serdes.StringSerde(),
            new SomeFooSerde());

public static KeyValueBytesStoreSupplier supplier(RedisKeyValueStoreBytes redisKeyValueStoreBytes) {

    return new KeyValueBytesStoreSupplier() {
        public String name() {
            return "redis-store";

        public KeyValueStore<Bytes, byte[]> get() {
            return redisKeyValueStoreBytes;

        public String metricsScope() {
            return "redis-session-state";

I now test the application with an EmbeddedKafka:

@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@SpringBootTest(classes = {TestConfigurationTests.class})
@EmbeddedKafka(count = 3, ports = {29901, 29902, 29903}, zookeeperPort = 33991)
public class TestKafkaIntegration {

Where I try to access the state store and query the items added:

ReadOnlyKeyValueStore<String, Foo> queryableStore = interactiveQueryService.getQueryableStore(
        "redis-store", QueryableStoreTypes.keyValueStore());
return queryableStore;

But when I run my test I receive an error:

Caused by: org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore redis-store is already added.

Several questions:

  • The examples for using custom state stores explained by [1] use it within a Processor. Does this automatically mean, I am not able to use a custom state store in an aggregation?
  • When it is not possible to use it within an aggregation, what is the point of using custom state stores anyway?
  • When I slightly change the code above for the kstreams and define a processor instead of using materialized in the aggregate method, the error changes, it then complains about a missing state "redis-store" store while trying to execute getQueryableStore. But indeed i can see, that the addStateStoreBeans registers the 'redis-store'. How can this happen?

The reason why I want to use a custom state store is, that I am not (really easily) able to have a dedicated hard disk for the application instance. To have a fast startup for the application I want to avoid processing of the complete changelog on each startup of the application (which should preferably take place several times a day and currently takes more than an hour). So now the last question:

  • When a custom external state store is used, am I able to resume to the last state on application restart?

[1] https://spring.io/blog/2019/12/09/stream-processing-with-spring-cloud-stream-and-apache-kafka-streams-part-6-state-stores-and-interactive-queries


You are using Materialized.as(java.lang.String storeName) which will create (materialize) a StateStore with the given name ("redis-store" here). On the other hand, with builder(RedisKeyValueStoreBytes redisKeyValueStoreBytes) you are creating another StateStore having the same name, which springframework is probably automatically adding it to the topology, such that you get "store is already added" error.

q1: you can use a custom state store in an aggregation; use it with Materialized.as(KeyValueBytesStoreSupplier supplier)

q2: one could also use a StateStore with a transformer or a custom processor for interactive queries; also with a global StateStore one could access the entire topic instead of KafkaStreams instance only allocated partitions (see addGlobalStore and globalTable)

q3: I guess you didn't (manually) register the state store with the topology; see Topology.addStateStore(StoreBuilder<?> storeBuilder, java.lang.String... processorNames) and Connecting Processors and State Stores

q4: yes, the state store is loaded from a changelog topic (could be the original topic when using optimizations)