2
votes

I'm using Spring Boot 2.2.0.M4 and Kafka 2.2.0 trying to build an application based on the sample at https://www.baeldung.com/spring-kafka. When I enable the listener for my topic, I get the following error on the consumer.

[AdminClient clientId=adminclient-2] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

The following is defined in my application properties.

kafka.bootstrapAddress=172.22.22.55:9092

Here's the @KafkaListener annotated method.

@KafkaListener(topics = "add_app", groupId = "foo")
public void listen(String message) {
    System.out.println("Received Message in group foo: " + message);
}

Below is the Consumer configuration class that is referencing the kafka.bootstrapAddress value. It is logged properly.

@Configuration
@Slf4j
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    public ConsumerFactory<String, String> consumerFactory(String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        log.info("Created {} using address {}.", this.getClass(), bootstrapAddress);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory("foo"));
        return factory;
    }
1
I had a similar problem, at first I thought that it's a problem with my spring kafka endpoint configuration, but as it turns out I incorrectly configured the advertised listeners on my kafka. Check out this answer stackoverflow.com/questions/59867685/…Ben

1 Answers

1
votes

The solution to this is fairly simple. I just needed to add the following to the application.properties file.

spring.kafka.bootstrap-servers=174.22.22.55:9092

After looking at KafkaProperties.java, I found this line:

private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));

and this method actually builds them:

private Map<String, Object> buildCommonProperties() {
        Map<String, Object> properties = new HashMap();
        if (this.bootstrapServers != null) {
            properties.put("bootstrap.servers", this.bootstrapServers);
        }

        if (this.clientId != null) {
            properties.put("client.id", this.clientId);
        }

        properties.putAll(this.ssl.buildProperties());
        if (!CollectionUtils.isEmpty(this.properties)) {
            properties.putAll(this.properties);
        }

        return properties;
    }

Since it's already predefined on the class, the broker initially defined on the KafkaConsumerConfig is not used.

Update

Adding the containerFactory attribute to the listener annotation also fixes it and removes the need for the change to application.properties.

@KafkaListener(topics = "add_app", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
public void listen(String message) {
    System.out.println("Received Message in group foo: " + message);
}