0
votes

I am using Ubuntu 14.04. I just npm install kafka-node and using kafka_0.9.2-0.8.2.1. I have one nodejs and two brokers, kafka1, kafka2. The zookeeper is in kafka1. Following codes in nodejs client:(which is in node_js_machine). When I kill kafka process which nodejs currently connects. The nodejs crushes and exited. The main problem is in error handling, if I comment 4 lines as follows (problem lines), then even I kill borker, the node does not crushes. I am wondering when broker nodejs connected is down, how to catch this event and reconnect to another broker? Is my error handling correct?

var csUtil        = require('cs-js-common');
var kafka = require('kafka-node'),
    Producer = kafka.Producer,
    KeyedMessage = kafka.KeyedMessage,
    client = new kafka.Client('kafka1:2181/','AppAnalyzerStorm'),
    producer = new Producer(client),
    km = new KeyedMessage('key', 'message');
var kafkaConnected = false;

producer.on('ready',function(){
    kafkaConnected = true;
    log.info("kafka producer is connected");
     console.log("kafka producer is connected");
});


producer.on('error',function(err){
    log.error("error in producer on error"+err);
    console.log("producer error is invoked");

    kafka = require('kafka-node');                                             ===========problem line 1                                      
    Producer = kafka.Producer;                                                ===========problem line 2 
    client = new kafka.Client('kafka1:2181/','AppAnalyzerStorm');  ======problem line 3
    producer = new Producer(client);                                                          ======problem line 4
});


producer.send(xxxx) 
1
Why is the producer.send() outside 'producer.on('ready', function(){})' ?? is it ok ?pravin

1 Answers

0
votes

Add this line to the end,,,

process.on('uncaughtException', function (err) {
    console.log(err);
}); 

Through this, the client will not halt if there are any uncaught exceptions.

But still, to connect to some other broker on error, the broker ip address used in kafka.Client (problem line number - 3) above must be changed.