I'm using Spring Boot 2.x with spring-kafka (not spring-integration-kafka)
I have multiple beans annotated with @KafkaListener
... each one consuming from one topic... so since I have 12 topics then I also need to have 12 KafkaConsumers beans ... and I would like to know if I can create those beans programmatically / dynamically ... maybe using KafkaListenerEndpointRegistry in order to create consumer containers dynamically.
Note: I need to consume messages in batch ... so maybe I can use BatchMessageListener?
Current code:
@KafkaListener(
id = COUNTRY,
containerFactory = KAFKA_LISTENER_FACTORY_BEAN_NAME,
topics = {TOPIC},
groupId = GROUP_ID,
clientIdPrefix = CLIENT_ID,
errorHandler = VALIDATION_ERROR_HANDLER_BEAN_NAME
)
@Override
public void consume(final List<MessageDTO> messages,
@Header(KafkaHeaders.RECEIVED_TOPIC) final List<String> topics,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) final List<String> messagesKey,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) final List<Integer> partitionIds,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) final List<Long> timestamps,
@Header(KafkaHeaders.OFFSET) final List<Long> offsets) {
(...)
}
Each topic consumer has its own implementation depending on the topic. Can you guys point me to a blog/pseudocode/git thread/answer, please?