1
votes

Appreciate your help on this issue.

I am using Kafka 0.8.2

Here's the producer code I've written.

The issue is everything is working fine..I am able to publish the message and get the acknowledgement (metadata) after successful publish. But the issue is .....

  1. How can I track If the kafka broker is down. I need to display an error message If the broker is down.

  2. If the broker is down I need to capture the messages publish in the meantime till the broker is up & running.

Any insights would be very helpful.

public class cvsKafkaProducerAck {
public static void main(String args[]) {


    String topic = "test_topic";
    String msg = "test...22";

    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.RETRIES_CONFIG, "1");
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    //props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG , "1");
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);
    props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, true);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");

    KafkaProducer<byte[], byte[]> m_kafkaProducer = new KafkaProducer<byte[], byte[]>(props);       
    ProducerRecord<byte[], byte[]> prMessage = new ProducerRecord<byte[],byte[]>(topic, msg.getBytes());

    try
    {
        RecordMetadata metadata = m_kafkaProducer.send(prMessage).get();
        System.out.println("The offset of the record we just sent is: " + metadata.offset());
    }
    catch(Exception e)
    {
        System.out.println(e.getMessage());
    }

    m_kafkaProducer.close();

}

}

2

2 Answers

0
votes

We have a Kafka broker monitoring system that uses the node-zookeeper-client library.

Brokers are registered on the Zookeeper node /brokers/ids. Using the client.getChildren() function we constantly monitor that path looking for brokers being added or removed. The client.getChildren() function can place a watcher on the Zookeeper node which will fire an event when child nodes are added or removed.

Our node application gets the known brokers at startup from the /brokers/ids path, and then monitors the path continuously for changes, alerting when a broker is added or removed. This is done by comparing an array of the current brokers with an array of the last retrieved list of brokers using the lodash library e.g.

//this code is fired when the /brokers/ids/ path changes (i.e. a child node is added or removed)

//currentBrokers = list of brokers retrieved when the app started
//newBrokerList = latest list of brokers retrieved from client.getChildren()
var brokersChanged = _.difference(currentBrokers, newBrokerList); 

  //if 'brokersChanged' contains any values alert that
  //these brokers have become unregistered with zookeeper
  if(brokersChanged.length > 0){
    _.each(brokersChanged, function(broker){
      var indexToRemove = _.findIndex(currentBrokers, function(existingBroker) {
        if(existingBroker == broker){
          return existingBroker;
        }
      });

      currentBrokers.splice(indexToRemove,1);
    });    
  }
  else {
    //if 'brokersChanged' is empty a new broker has been registered
    //with zookeeper
    var newBroker = _.difference(data, currentBrokers);
    _.each(newBroker, function(broker){
      currentBrokers.push(broker);
    });
  }

//currentBrokers will now contain the updated list of brokers

Hope this helps

0
votes

You can simply check for the JMX metrics available on Kafka server to see if the Kafka broker is down. Check for each kafka broker ReplicaManager.LeaderCount value and if it is 0 then that broker is NOT leader for any topic which in terms is not active/down.

You can find more available metrics here https://cwiki.apache.org/confluence/display/KAFKA/Available+Metrics