3
votes

I'm using Spring Boot 2.x with spring-kafka (not spring-integration-kafka)

I have multiple beans annotated with @KafkaListener ... each one consuming from one topic... so since I have 12 topics then I also need to have 12 KafkaConsumers beans ... and I would like to know if I can create those beans programmatically / dynamically ... maybe using KafkaListenerEndpointRegistry in order to create consumer containers dynamically.

Note: I need to consume messages in batch ... so maybe I can use BatchMessageListener?

Current code:

@KafkaListener(
        id = COUNTRY,
        containerFactory = KAFKA_LISTENER_FACTORY_BEAN_NAME,
        topics = {TOPIC},
        groupId = GROUP_ID,
        clientIdPrefix = CLIENT_ID,
        errorHandler = VALIDATION_ERROR_HANDLER_BEAN_NAME
    )
    @Override
    public void consume(final List<MessageDTO> messages,
        @Header(KafkaHeaders.RECEIVED_TOPIC) final List<String> topics,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) final List<String> messagesKey,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) final List<Integer> partitionIds,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) final List<Long> timestamps,
        @Header(KafkaHeaders.OFFSET) final List<Long> offsets) {
            (...)
    }

Each topic consumer has its own implementation depending on the topic. Can you guys point me to a blog/pseudocode/git thread/answer, please?

2
every topic is in same cluster? and what about payloads? are they different?Deadpool
The payload structure is the same ... and I have one topic per country. And then I will have at least one consumer per topic because the implementation depends on the countryLeonel

2 Answers

0
votes

If your topics are having some pattern you can try this one also:

      kafka:
        bindings:
            input.consumer.destination-is-pattern: true