0
votes

Kafka - Multiple Consumers From Same Group Assigned Same Partition

I have just started learning Kafka and Nodejs. I have written a consumer as following

// consumer.js
const kafka = require('kafka-node');
var client = new kafka.Client('localhost:2181');
var topics = [{
    topic: 'topic-4'
}];

var options = {
    groupId: 'kafka-node-group-2',
    autoCommit: true,
    fetchMaxWaitMs: 1000,
    fetchMaxBytes: 1024 * 1024,
    encoding: 'buffer'
};
var consumer = new kafka.HighLevelConsumer(client, topics, options);

// consumer.payloads has only one entry
console.log('Topic', consumer.payloads[0].topic);
console.log('Group', consumer.options.groupId);
console.log('Assigned Partition:', consumer.payloads[0].partition);

Output

Topic topic-4
Group kafka-node-group-2
Assigned Partition: 0

topic-4 has four partitions.

./desc_topic.sh topic-4
Topic:topic-4   PartitionCount:4    ReplicationFactor:1 Configs:
    Topic: topic-4  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    Topic: topic-4  Partition: 1    Leader: 2   Replicas: 2 Isr: 2
    Topic: topic-4  Partition: 2    Leader: 0   Replicas: 0 Isr: 0
    Topic: topic-4  Partition: 3    Leader: 2   Replicas: 2 Isr: 2

Edit

I have used ConsumerGroup as follows.

var options = {
    host: 'localhost:2181',  // zookeeper host omit if connecting directly to broker (see kafkaHost below)
    groupId: 'Group-1',
    sessionTimeout: 15000,
    // // An array of partition assignment protocols ordered by preference.
    // // 'roundrobin' or 'range' string for built ins (see below to pass in custom assignment protocol)
    protocol: ['roundrobin']
};
var consumer = new kafka.ConsumerGroup(options, ['topic-4']);

The producer is sending 100 messages that are received as the following. That's how I know the assigned partition (not from consumer object).

{
    topic: 'topic-4',
    value: '{"subject":"Message Id 30 "}',
    offset: 172,
    partition: 0,
    highWaterOffset: 173,
    key: null
}

When I run two such consumer instances (same topic and group), only one of them receives everything from partition-0. Isn't that a problem too?

This is the producer code.

const kafka = require('kafka-node');
const Client = kafka.Client;
var client = new Client('localhost:2181', 'my-client-id', {
  sessionTimeout: 300,
  spinDelay: 100,
  retries: 2
});

// For this demo we just log client errors to the console.
client.on('error', function(error) {
  console.error(error);
});

var producer = new kafka.HighLevelProducer(client);

producer.on('ready', function() {
    for (var i = 0; i <= 30; i++) {
        let id = 'Message Id ' + i + ' ';
        let msg = {
            'subject': id
        };
        var messageBuffer = Buffer.from(JSON.stringify(msg));

        // Create a new payload
        var payload = [{
            // topic: 'topic-', + (i%2+2),
            topic: 'topic-4',
            messages: messageBuffer,
            timestamp: Date.now(),
            attributes: 1 /* Use GZip compression for the payload */
        }];

        //Send payload to Kafka and log result/error
        producer.send(payload, function(error, result) {
            console.info('Sent payload to Kafka: ', payload);
            if (error) {
                console.error('Error', error);
            } else {
                var formattedResult = result[0];
                console.log('result: ', result)
            }
        });
    }
});

// For this demo we just log producer errors to the console.
producer.on('error', function(error) {
    console.error(error);
});
1
The problem looks fairly straightforward to me - remove the manual assignment of partition: 0OneCricketeer
My bad! I did that for debugging purpos but the problem persists even without that. I have edited my question.Shashwat
Have you tried using the ConsumerGroup class instead of HighLevelConsumer?OneCricketeer
Yes, I tried that too. Same result.Shashwat
Can you show your commands to create the topic and data insertion so someone could reproduce this issue?OneCricketeer

1 Answers

2
votes

This is a known issue. I have encountered it as well. If you are using a more recent version of Kafka than posted for the fix, it may be worth re-examining and possibly re-opening this issue.

https://issues.apache.org/jira/browse/KAFKA-6681