2
votes

I have a question about managing multiple CG's, created three consumer groups, every CG has its own kafka service, group Id and topic.

now i'm receiving messages as expected, but, i wondering if its possible to create next scenario:

create three consumer groups, but receive messages only from one,put others on pause/hold for now, if his kafka service will fall, consume messages from next consumer group, and same with the third.

Here's an example for my code:

function createConsumerGroup(topics){

    const ConsumerGroup = kafka.ConsumerGroup;

    //CREATE CONSUMER GROUPS FOR EVERY SERVICE
    for(let i = 0; i < config.kafka_service.length ;i++){  //3

        const options = {
            groupId: config.kafka_service[i]['groupId'],
            host: config.kafka_service[i]['zookeeperHost'],
            kafkaHost: config.kafka_service[i]['kafkaHost'],
            sessionTimeout: 15000,
            protocol: ['roundrobin'],
            fromOffset: 'latest'
        }

        //assign all services CG names and create [i] consumer groups!
        let customConsumerGroupName = config.kafka_service[i]['consumerGroupName'];

        customConsumerGroupName = new ConsumerGroup(options, topics);

        customConsumerGroupName.on('connect', (resp) => {
            console.log(`${config.kafka_service[i]['consumerGroupName']} is connected!`);
        });

        if(i > 0){
            //pause consumers exept FIRST
            customConsumerGroupName.pause();
        }


        customConsumerGroupName.on('message', (message) => {
           console.log(message);
        });

        customConsumerGroupName.on('error', (error) => {
            console.log('consumer group error: ', error);

           //HERE I NEED TO CALL SECOND CONSUMER TO STEP UP
           //MAYBE consumerGroup.resume(); ???
        });

    }
}

hopes its sound understandable, thanks :)

2
It looks like you're trying to use separate consumer groups as a fail safe, but a single consumer group should do what you want I believe. Each group should have multiple consumers, and if one consumer fails another in the group will pick up where it left off. Unless I misunderstood your issue. - Brian Fitzpatrick
you did, both you and @Moonwalkr talking about single CG to handle that scenario, but i didn't saw any example or tutorial to set different data centers in single CG. any lead please? - draftish
To set separate consumers to the same CG (across data centers) you would use the groupId config option. You have it set to 'config.kafka_service[i]['groupId']'. Wherever you create your consumer, you would just make sure it has the same groupId. I'm not familiar with Node's kafka package, but I would say you should stick to creating regular Consumers, and assign each one the same groupid. This way you can have separate consumers be a part of the same group no matter which server it's hosted on. - Brian Fitzpatrick
I just read up on the kafka node api, and the ConsumerGroup is somewhat unfortunately named. It is a kafka consumer, and the groupId option is what is controlling the 'actual' kafka consumer group. So your code should work as is as long as the groupId option for each 'new ConsumerGroup' is the same. This way all the ConsumerGroups will be a part of the actual Kafka Consumer group, reading from the same topic, and ensuring messages don't go to crashed consumers (ConsumerGroup instances in the Node parlance). - Brian Fitzpatrick

2 Answers

4
votes

So it looks like the confusion arises because of the name of the Node package's 'ConsumerGroup'. In Kafka terms, the consumer group is controlled solely by the groupId used by each consumer. Consumers with the same groupId will not be given duplicate messages, each topic message is only read by a single consumer. If a consumer goes down, kafka detects this and gives it's partitions to a separate consumer.

The Node 'ConsumerGroup' is really just another Kafka consumer (the new Consumer with groups managed by Kafka rather than zookeeper as of Kafka >0.9).

So the way to leverage a kafka consumer group with the Node ConsumerGroup would be as follows:

function createConsumerGroup(topics){

const ConsumerGroup = kafka.ConsumerGroup;

//CREATE CONSUMER GROUPS FOR EVERY SERVICE
for(let i = 0; i < config.kafka_service.length ;i++){  //3

    const options = {
        groupId: 'SOME_GROUP_NAME',
        host: config.kafka_service[i]['zookeeperHost'],
        kafkaHost: config.kafka_service[i]['kafkaHost'],
        sessionTimeout: 15000,
        protocol: ['roundrobin'],
        fromOffset: 'latest'
    }

    //assign all services CG names and create [i] consumer groups!
    let customConsumerGroupName = config.kafka_service[i]['consumerGroupName'];

    customConsumerGroupName = new ConsumerGroup(options, topics);

    customConsumerGroupName.on('connect', (resp) => {
        console.log(`${config.kafka_service[i]['consumerGroupName']} is connected!`);
    });

    customConsumerGroupName.on('message', (message) => {
       console.log(message);
    });

    customConsumerGroupName.on('error', (error) => {
        console.log('consumer group error: ', error);

       //Error handling logic here, restart the consumer that failed perhaps? 
       //Depends on how you want to managed failed consumers.
    });
  }
}

Each instance of Nodes ConsumerGroup will be a member of the group 'SOME_GROUP_NAME', and any other consumers created with that same groupId will also act as members of the same kafka consumer group, regardless of server, etc.

0
votes

Consumer groups solve two central scenarios:

1. Scaling You can increase the number of consumers in a group to handle an increasing rate of messages being produced in the topic(s) the group is consuming (scaling out)

2. Failover By having a group of consumers reading the same topic(s), they will automatically handle the situation where one or more consumer(s) go down.

So, instead of having "stand-by" consumer groups, where you have to handle which ones are active yourself, you just rely on Kafka's built-in failover. Consumers can run in several different containers (even in different data centers), and Kafka will automatically make sure that messages are delivered to the individual consumers, no matter where they are or how many of them are running at any given time.