0
votes

I am using spring-cloud-stream-kafka in my spring-boot(consumer) application.The health of the app is inaccurate, 'UP' even when the app can't connect to Kafka(Kafka broker is down). I have read articles on kafka health check. It looks like kafka health check is disabled in spring actuator health check.

So, I managed to write the following code to enable kafka health check for my app. I think, I am missing some connection between the app config and my code and I don't see the Kafka health working.

(1) I am creating a custom health indicator bean as follows:

      import java.util.HashMap;
      import java.util.Map;

      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.common.serialization.ByteArrayDeserializer;
      import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
      import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
      import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
      import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.kafka.core.ConsumerFactory;
      import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
      import org.springframework.util.ObjectUtils;

      @Configuration
      @ConditionalOnClass(name = "org.springframework.boot.actuate.health.HealthIndicator")
      public class KafkaBinderHealthIndicatorConfiguration {

        @Bean
        KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder,
            KafkaBinderConfigurationProperties configurationProperties) {
          Map<String, Object> props = new HashMap<>();
          props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
          props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
          Map<String, Object> mergedConfig = configurationProperties.getConsumerConfiguration();
          if (!ObjectUtils.isEmpty(mergedConfig)) {
            props.putAll(mergedConfig);
          }
          if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
          }
          ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
          KafkaBinderHealthIndicator indicator = new KafkaBinderHealthIndicator(kafkaMessageChannelBinder, consumerFactory);
          indicator.setTimeout(configurationProperties.getHealthTimeout());
          return indicator;
        }
      }

(2) Created binder config:

          import java.io.IOException;

          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
          import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
          import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
          import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
          import org.springframework.boot.context.properties.EnableConfigurationProperties;
          import org.springframework.cloud.stream.binder.Binder;
          import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
          import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
          import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
          import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
          import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
          import org.springframework.context.annotation.Bean;
          import org.springframework.context.annotation.Configuration;
          import org.springframework.context.annotation.Import;
          import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;

          @Configuration
          @ConditionalOnMissingBean(Binder.class)
          @Import({ KafkaAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class,
              KafkaBinderHealthIndicatorConfiguration.class })
          @EnableConfigurationProperties({ KafkaExtendedBindingProperties.class })
          public class KafkaBinderConfiguration {

            @Autowired
            private KafkaExtendedBindingProperties kafkaExtendedBindingProperties;

          //  @Autowired
          //  private ProducerListener               producerListener;

            @Bean
            KafkaBinderConfigurationProperties configurationProperties(KafkaProperties kafkaProperties) {
              return new KafkaBinderConfigurationProperties();
            }

            @Bean
            KafkaTopicProvisioner provisioningProvider(KafkaBinderConfigurationProperties configurationProperties) {
              return new KafkaTopicProvisioner(configurationProperties, new Kafka10AdminUtilsOperation());
            }

            @Bean
            KafkaMessageChannelBinder kafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties,
                KafkaTopicProvisioner provisioningProvider) {

              KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(configurationProperties,
                  provisioningProvider);
          //    kafkaMessageChannelBinder.setProducerListener(producerListener);
              kafkaMessageChannelBinder.setExtendedBindingProperties(this.kafkaExtendedBindingProperties);
              return kafkaMessageChannelBinder;
            }

            @Bean
            public KafkaJaasLoginModuleInitializer jaasInitializer() throws IOException {
              return new KafkaJaasLoginModuleInitializer();
            }

          }
  1. App properties I have added:

    management.health.binders.enabled = true, management.health.kafka.enabled = true

===========OUTPUT============= When I launch my app locally and hit the /health endpoint, I see the following for kafka:

 "binders": {
     "status": "UNKNOWN",
     "kafka": {
     "status": "UNKNOWN"
     }
  },
1
What version of spring-cloud-stream are you using? I just tried an app locally on the latest snapshot with kafka binder and hit the /actuator/health endpoint and saw that the {"status":"UP"} message comes up. While the broker is down seeing the "DOWN" message as well. By default, if management.health.binders.enabled property is missing, then it will match and no need to set that property to true. Can you try a simple app without your custom beans for health indicator configuration and see if that works on the latest versions of spring-cloud-stream?sobychacko
when the broker was down, the "DOWN" message appeared only after a significant delay after timing out the request to connect to Kafka in the health indicator check.sobychacko
compile(group:org.springframework.cloud, name: 'spring-cloud-stream-binder-kafka11', version: '1.3.0.RELEASE'). Okay, I am going to try a small app with the latest version. Thanks.Sin
Are you using 2.0.0.RELEASE?Sin

1 Answers

2
votes

Issue was resolved by using the latest version of 'spring-cloud-stream-binder-kafka'. I was using an older version (version older than 1.3.0.RELEASE) initially and the health check for kafka wasnt working. As @Sobychacko suggested, I used the latest version, 2.0.0 REALEASE and the health check for kafka binders was woking fine :) with no custom health indicator beans.

"binders": { "status": "UP", "kafka": { "status": "UP", "healthIndicator": { "status": "UP" } } },

This check should also work with version 1.3.0.RELEASE