0
votes

I have a Spring Boot app that has a Kafka consumer and producer in it. There's also a bean to create a topic.

e.g.

@KafkaListener(topics = "myTopic")
    public void doSomething() {
      // do something on receipt of the message
    }


@Bean
public NewTopic topic(){
    return TopicBuilder.name("myTopic")
            .partitions(2)
            .

Both my Spring Boot app and Kafka start up in Docker in Kubernetes. Sometimes the Spring Boot app starts up before the Kafka pod is up and therefore fails to start as the consumer cannot connect (see stacktrace).

Is there a way of my application starting up in a resilient manner ? For example the consumer should cope with Kafka not being there at startup or when the app is running ?

Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:827)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:629)
    at org.springframework.kafka.core.Def    Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:827)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:629)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:207)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:193)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:167)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:141)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:607)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:329)
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:340)
    at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:176)
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:340)
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312)
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257)
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
    ... 59 common frames omitted
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:88)
    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:735)aultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:207)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:193)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:167)
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:141)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:607)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:329)
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:340)
    at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:176)
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:340)
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312)
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257)
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
    ... 59 common frames omitted
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:88)
    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:735)
1
It is a bad practise to start a service, even when the dependencies aren't started yet. Could lead to failure in calls.Sahil Gupta

1 Answers

3
votes

You can set autostartup = "false" on the listener and start it yourself (using the KafkaListenerEndpointRegistry - give the listener an id so you can get a reference to its container from the registry).

If the broker is not available, the KafkaAdmin won't create the topic; you will also need to call KafkaAdmin.initialize():

/**
 * Call this method to check/add topics; this might be needed if the broker was not
 * available when the application context was initialized, and
 * {@link #setFatalIfBrokerNotAvailable(boolean) fatalIfBrokerNotAvailable} is false,
 * or {@link #setAutoCreate(boolean) autoCreate} was set to false.
 * @return true if successful.
 * @see #setFatalIfBrokerNotAvailable(boolean)
 * @see #setAutoCreate(boolean)
 */
public final boolean initialize() {