We are using Kafka-streams included within the spring cloud stream Hoxton RC7 project (and therefore use the Kafka-streams and Kafka-client versions provided [2.3.1])
ext {
set('springCloudVersion', 'Hoxton.SR7')
}
...
dependencies {
// spring cloud stream
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
implementation("org.springframework.cloud:spring-cloud-stream")
// redis
implementation 'io.lettuce:lettuce-core'
implementation 'org.springframework.data:spring-data-redis'
testCompile 'it.ozimov:embedded-redis:0.7.2'
...
We have implemented a kstreams application
@Bean
public Consumer<KStream<String, IncomingEvent>> process() {
return input -> {
Where we do some aggregation within like:
.aggregate(Foo::new, (key, value1, aggregate) ->
(aggregate == null || aggregate.getLastModified() == null || this.mustProcess(key, value1))
? value1
: aggregate,
materialized
)
Now materialized should be a custom external state store (Redis):
Materialized<String, Foo, KeyValueStore<Bytes, byte[]>> materialized =
Materialized.as("redis-store");
Which is provided by a StoreBuilder Bean:
@Bean
public StoreBuilder<KeyValueStore<String, Foo>> builder(RedisKeyValueStoreBytes redisKeyValueStoreBytes){
return Stores.keyValueStoreBuilder(supplier(redisKeyValueStoreBytes),
new Serdes.StringSerde(),
new SomeFooSerde());
}
public static KeyValueBytesStoreSupplier supplier(RedisKeyValueStoreBytes redisKeyValueStoreBytes) {
return new KeyValueBytesStoreSupplier() {
@Override
public String name() {
return "redis-store";
}
@Override
public KeyValueStore<Bytes, byte[]> get() {
return redisKeyValueStoreBytes;
}
@Override
public String metricsScope() {
return "redis-session-state";
}
};
}
I now test the application with an EmbeddedKafka:
@ActiveProfiles("test")
@RunWith(SpringRunner.class)
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@SpringBootTest(classes = {TestConfigurationTests.class})
@EmbeddedKafka(count = 3, ports = {29901, 29902, 29903}, zookeeperPort = 33991)
public class TestKafkaIntegration {
Where I try to access the state store and query the items added:
ReadOnlyKeyValueStore<String, Foo> queryableStore = interactiveQueryService.getQueryableStore(
"redis-store", QueryableStoreTypes.keyValueStore());
return queryableStore;
But when I run my test I receive an error:
Caused by: org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore redis-store is already added.
Several questions:
- The examples for using custom state stores explained by [1] use it within a Processor. Does this automatically mean, I am not able to use a custom state store in an aggregation?
- When it is not possible to use it within an aggregation, what is the point of using custom state stores anyway?
- When I slightly change the code above for the kstreams and define a processor instead of using materialized in the aggregate method, the error changes, it then complains about a missing state "redis-store" store while trying to execute getQueryableStore. But indeed i can see, that the addStateStoreBeans registers the 'redis-store'. How can this happen?
The reason why I want to use a custom state store is, that I am not (really easily) able to have a dedicated hard disk for the application instance. To have a fast startup for the application I want to avoid processing of the complete changelog on each startup of the application (which should preferably take place several times a day and currently takes more than an hour). So now the last question:
- When a custom external state store is used, am I able to resume to the last state on application restart?