I need to create kafka streams dynamically from config files, which contain source topic name and configs per each stream . App need to have tens of kafka streams and streams will be different on each environment (e.g. stage, prod).
Is it possible to do that with spring-kafka library?
We can do that easy with kafka-streams:
@Bean
public List<KafkaStreams> kafkaStreams() {
return streamRouteProperties.stream()
.map(routeProperty -> createKafkaStream(routeProperty))
.collect(toList());
}
private KafkaStreams createKafkaStream(KafkaConfigurationProperties kafkaProperties) {
StreamsBuilder builder = new StreamsBuilder();
KStream<Object, String> stream = builder.stream(kafkaProperties.getTopicName());
Topology topology = builder.build();
StreamsConfig streamsConfig = new StreamsConfig(kafkaProperties.getSettings());
return new KafkaStreams(topology, streamsConfig);
}
and we need to implement spring SmartLifecycle interface, so all streams will be started and closed automatically.
is it possible to do the same using spring-kafka?
as I see, we need to create each kafka stream in code, and I don't see a possibility how to create list of kafka streams using StreamsBuilderFactoryBean.
for each required stream I need to do the following:
@Bean
public KStream<?, ?> kStream(StreamsBuilder streamsBuilder) {
Consumed<String, String> consumed = ..;
KStream<String, String> kStream = streamsBuilder.stream(topicName, consumed);
kStream.process(() -> eventProcessor);
return kStream;
}
@Bean
public FactoryBean<StreamsBuilder> streamsBuilder() {
return new StreamsBuilderFactoryBean(streamsConfig);
}
but how to create list of kafka streams dynamically using StreamsBuilderFactoryBean?