0
votes

I am working on a spring boot kafka consumer application. It will have different consumers working on different topics. All the information for the consumers will come from the application.yml file.

application:
  kafka:
    consumer-config:
      - name: consumer-a
        topics: topic1,topic2
        ......
      - name: consumer-b
        topics: topic3,topic4
        .....

I am not able to set the list of topics from the application properties on to the KafkaListener.

I tried the following:

@KafkaListener(topics = "#{'${application.kafka.consumer-config[0].topics}'.split(',')}",containerFactory = "kafkaListenerContainerFactory")


@KafkaListener(topics = "#{'${application.kafka.consumer-config.?[name == 'consumer-a'].topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")

In both the cases I am getting the following error:

java.lang.IllegalArgumentException: Could not resolve placeholder

What is the best way to get the topics from application properties and set it on KafkaListener topics?

1

1 Answers

0
votes

What version are you using? I just tested it and it works fine...

@SpringBootApplication
public class So63583349Application {

    public static void main(String[] args) {
        SpringApplication.run(So63583349Application.class, args);
    }

    @KafkaListener(topics = "#{'${application.kafka.consumer-config[0].topics}'.split(',')}", id = "so63583349")
    public void listen(String in) {
        System.out.println(in);
    }

}

2020-08-25 13:02:28.384 WARN 66237 --- [o63583349-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-so63583349-1, groupId=so63583349] Error while fetching metadata with correlation id 41 : {topic1=UNKNOWN_TOPIC_OR_PARTITION, topic2=UNKNOWN_TOPIC_OR_PARTITION}

For the second one, you can't use SpEL selection within the property placeholder. Here is one solution for that situation:

@SpringBootApplication
public class So63583349Application {

    public static void main(String[] args) {
        SpringApplication.run(So63583349Application.class, args);
    }

    @KafkaListener(topics = "#{@props.consumerConfig.?[name == 'consumer-a'].get(0).topics.split(',')}",
            id = "so63583349")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    Props props() {
        return new Props();
    }

}

@ConfigurationProperties(value = "application.kafka")
class Props {

    List<Properties> consumerConfig;

    public List<Properties> getConsumerConfig() {
        return this.consumerConfig;
    }

    public void setConsumerConfig(List<Properties> consumerConfig) {
        this.consumerConfig = consumerConfig;
    }

}