4
votes

I'm using Node with kafka for the first time, using kafka-node. Consuming a message requires calling an external API, which might even take a second to response. I wish to overcome sudden failures of my consumer, in a way that if a consumer fails, another consumer that will consume that will replace it will receive the same message that its work was not completed.

I'm using kafka 0.10 and trying to use ConsumerGroup.

I thought of setting autoCommit: false in options, and committing the message only once its work has been completed (as I previously done with some Java code some time ago).

However, I can't seem to be sure how should I correctly commit the message only once it is done. How should I commit it?

Another worry I have is that it seems, because of the callbacks, that the next message is being read before the previous one had finished. And I'm afraid that if message x+2 have finished before message x+1, then the offset will be set at x+2, thus in case of failure x+1 will never be re-executed.

Here is basically what I did so far:

var options = {
    host: connectionString,
    groupId: consumerGroupName,
    id: clientId,
    autoCommit: false
};

var kafka = require("kafka-node");
var ConsumerGroup = kafka.ConsumerGroup;

var consumerGroup = new ConsumerGroup(options, topic);

consumerGroup.on('connect', function() {
    console.log("Consuming Kafka %s, topic=%s", JSON.stringify(options), topic);
});

consumerGroup.on('message', function(message) {
    console.log('%s read msg Topic="%s" Partition=%s Offset=%d', this.client.clientId, message.topic, message.partition, message.offset);
    console.log(message.value);
    doSomeStuff(function() {
        // HOW TO COMMIT????
        consumerGroup.commit(function(err, data) {
            console.log("------ Message done and committed ------");
        });
    });
});

consumerGroup.on('error', function(err) {
    console.log("Error in consumer: " + err);
    close();
});

process.once('SIGINT', function () {
    close();
});

var close = function() {
    // SHOULD SEND 'TRUE' TO CLOSE ???
    consumerGroup.close(true, function(error) {
        if (error) {
            console.log("Consuming closed with error", error);
        } else {
            console.log("Consuming closed");
        }
    });
};
1

1 Answers

0
votes

One thing you can do here is to have a retry mechanism for every message you process.

You can consult my answer on this thread: https://stackoverflow.com/a/44328233/2439404

I consume messages from Kafka using kafka-consumer, batch them together using async/cargo and put them in async/queue (in-memory queue). The queue takes a worker function as an arguement to which I am passing a async/retryable.

For your problem, you can just use retryable to do processing on your messages. https://caolan.github.io/async/docs.html#retryable

This may solve your problem.