I am using spring-cloud-stream-kafka in my spring-boot(consumer) application.The health of the app is inaccurate, 'UP' even when the app can't connect to Kafka(Kafka broker is down). I have read articles on kafka health check. It looks like kafka health check is disabled in spring actuator health check.
So, I managed to write the following code to enable kafka health check for my app. I think, I am missing some connection between the app config and my code and I don't see the Kafka health working.
(1) I am creating a custom health indicator bean as follows:
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.util.ObjectUtils;
@Configuration
@ConditionalOnClass(name = "org.springframework.boot.actuate.health.HealthIndicator")
public class KafkaBinderHealthIndicatorConfiguration {
@Bean
KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder,
KafkaBinderConfigurationProperties configurationProperties) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
Map<String, Object> mergedConfig = configurationProperties.getConsumerConfiguration();
if (!ObjectUtils.isEmpty(mergedConfig)) {
props.putAll(mergedConfig);
}
if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
}
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
KafkaBinderHealthIndicator indicator = new KafkaBinderHealthIndicator(kafkaMessageChannelBinder, consumerFactory);
indicator.setTimeout(configurationProperties.getHealthTimeout());
return indicator;
}
}
(2) Created binder config:
import java.io.IOException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
@Configuration
@ConditionalOnMissingBean(Binder.class)
@Import({ KafkaAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class,
KafkaBinderHealthIndicatorConfiguration.class })
@EnableConfigurationProperties({ KafkaExtendedBindingProperties.class })
public class KafkaBinderConfiguration {
@Autowired
private KafkaExtendedBindingProperties kafkaExtendedBindingProperties;
// @Autowired
// private ProducerListener producerListener;
@Bean
KafkaBinderConfigurationProperties configurationProperties(KafkaProperties kafkaProperties) {
return new KafkaBinderConfigurationProperties();
}
@Bean
KafkaTopicProvisioner provisioningProvider(KafkaBinderConfigurationProperties configurationProperties) {
return new KafkaTopicProvisioner(configurationProperties, new Kafka10AdminUtilsOperation());
}
@Bean
KafkaMessageChannelBinder kafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioningProvider) {
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(configurationProperties,
provisioningProvider);
// kafkaMessageChannelBinder.setProducerListener(producerListener);
kafkaMessageChannelBinder.setExtendedBindingProperties(this.kafkaExtendedBindingProperties);
return kafkaMessageChannelBinder;
}
@Bean
public KafkaJaasLoginModuleInitializer jaasInitializer() throws IOException {
return new KafkaJaasLoginModuleInitializer();
}
}
App properties I have added:
management.health.binders.enabled = true, management.health.kafka.enabled = true
===========OUTPUT============= When I launch my app locally and hit the /health endpoint, I see the following for kafka:
"binders": {
"status": "UNKNOWN",
"kafka": {
"status": "UNKNOWN"
}
},
/actuator/health
endpoint and saw that the {"status":"UP"} message comes up. While the broker is down seeing the "DOWN" message as well. By default, ifmanagement.health.binders.enabled
property is missing, then it will match and no need to set that property to true. Can you try a simple app without your custom beans for health indicator configuration and see if that works on the latest versions of spring-cloud-stream? – sobychacko