1
votes

I am using Kafka-Node in my NodeJS application to both produce and consume messages. I start a consumer that is waiting for my topic. I then start the producer and send the messages to Kafka. My consumer is inserting each one of these messages into a Postgres database.

For a single consumer, this is working fine.

When I stop the consumer and keep producing, I restart the consumer about 30 seconds later. I noticed there is about a dozen or so messages that were already inserted into the DB from the original consumer.

I am assuming that when I killed the consumer, there were offsets that had not yet been committed and that’s why the second consumer is picking them up?

What is the best way to handle this situation?

var kafka = require('kafka-node');
var utilities = require('./utilities');
var topics = ['test-ingest', 'test-ingest2'];
var groupName = 'test';

var options = {
    groupId: groupName,
    autoCommit: false,
    sessionTimeout: 15000,
    fetchMaxBytes: 1024 * 1024,
    protocol: ['roundrobin'],
    fromOffset: 'latest',
    outOfRangeOffset: 'earliest'
};

var consumerGroup = new kafka.ConsumerGroup(options, topics);

// Print the message
consumerGroup.on('message', function (message) {

    // Submit our message into postgres - return a promise
    utilities.storeRecord(message).then((dbResult) => {

        // Commit the offset
        consumerGroup.commit((error, data) => {
            if (error) {
                console.error(error);
            } else {
                console.log('Commit success: ', data);
            }
        });

    });

});
1

1 Answers

0
votes

I cannot tell why fromOffset: 'latest' does not work for you. An easy workaround is to use offset.fetchLatestOffsets in order to fetch the latest offset and then consume from that point onwards.