1
votes

I am using Node.js 10+, Apache Kafka 2.3 and no-kafka npm package.

Currently I have created a topic with replication factor 3 and partitions as 3. I have 3 kafka brokers on 3 different ports.

with no-kafka, I am able to see 3 consumers created as per partition count and all dwell on same machine. Below is the snapshot for the code and running model.

CODE:

var Kafka = require('no-kafka');
var Promise = require('bluebird');
var consumer = new Kafka.GroupConsumer({
    connectionString: 'kafka://192.168.1.172:9092, kafka://192.168.1.172:9093, kafka://192.168.1.172:9094'
  });

var dataHandler = function (messageSet, topic, partition) {
    return Promise.each(messageSet, function (m){
        console.log("Topic: " + topic, ", Partition: " + partition, ", Offset: " + m.offset, 
            ", Message: " + m.message.value.toString('utf8'));
        return consumer.commitOffset({topic: topic, partition: partition, offset: m.offset, metadata: 'optional'});
    });
};

var strategies = [{
    subscriptions: ['test'],
    handler: dataHandler
}];

consumer.init(strategies);

When I create a producer and run this, I get below output on console.

emgda@ubuntu:~/softwares/kafka_2.12-2.3.0$ ./bin/kafka-console-producer.sh --broker-list 192.168.1.172:9092 --topic test
>hey
>there
>how are you
>I am
>fine
>and
>how
>about
>you

Below is the output for consumer.

PS D:\checkout\javascript\sample projects\kafka> node .\consumer.js
2019-12-23T15:43:07.822Z INFO no-kafka-client Joined group no-kafka-group-v0.9 generationId 45 as no-kafka-client-c916ce86-3808-4a59-a257-bdb4a04e8ff7
2019-12-23T15:43:07.822Z INFO no-kafka-client Elected as group leader
2019-12-23T15:43:07.839Z DEBUG no-kafka-client Subscribed to test:0 offset 57 leader 192.168.1.172:9094
2019-12-23T15:43:07.840Z DEBUG no-kafka-client Subscribed to test:1 offset 56 leader 192.168.1.172:9094
2019-12-23T15:43:07.841Z DEBUG no-kafka-client Subscribed to test:2 offset 58 leader 192.168.1.172:9094
Topic: test , Partition: 2 , Offset: 58 , Message: hey
Topic: test , Partition: 1 , Offset: 56 , Message: there
Topic: test , Partition: 0 , Offset: 57 , Message: how are you
Topic: test , Partition: 1 , Offset: 57 , Message: fine
Topic: test , Partition: 2 , Offset: 59 , Message: I am
Topic: test , Partition: 0 , Offset: 58 , Message: and
Topic: test , Partition: 2 , Offset: 60 , Message: how
Topic: test , Partition: 0 , Offset: 59 , Message: you
Topic: test , Partition: 1 , Offset: 58 , Message: about

Everything works fine like if it is for 1 consumer,

1. Automatically 1 consumer is created

2. The message gets allotted in the topic partition in round robin fashion

3. The consumers are evenly distributed across 3 partitions for load balancing but on same machine.

When I investigate consumer group status using script provided by kafka, below is output on console.

emgda@ubuntu:~/softwares/kafka_2.12-2.3.0$ ./bin/kafka-consumer-groups.sh --describe --group  no-kafka-group-v0.9 --bootstrap-server 192.168.1.172:9093

GROUP               TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                          HOST            CLIENT-ID
no-kafka-group-v0.9 test            0          60              60              0               no-kafka-client-c916ce86-3808-4a59-a257-bdb4a04e8ff7 /192.168.1.48   no-kafka-client
no-kafka-group-v0.9 test            1          59              59              0               no-kafka-client-c916ce86-3808-4a59-a257-bdb4a04e8ff7 /192.168.1.48   no-kafka-client
no-kafka-group-v0.9 test            2          61              61              0               no-kafka-client-c916ce86-3808-4a59-a257-bdb4a04e8ff7 /192.168.1.48   no-kafka-client

QUESTION:

  1. The only issue is the consumers are on same machine. I wanted this to be distributed across different machine for load balancing and utilising proper resource across hardware.
  2. Is there a way to achieve this?

Note: I am restricted to use Node.js

1
How can you see that 3 consumers are automatically created?Ergi Nushi
@ErgiNushi: See the console output when I run the programAnkur Soni
Actually, you are using Node.js as consumer and you only have 1 consumer. You should start 3 Node.js services in order to have 3 consumersErgi Nushi
When I run multiple time then I get error as Returned from group requests (offset commits/fetches, heartbeats, etc) when the memberId is not in the current generation.Ankur Soni
That looks like a configuration problem. Check this out: stackoverflow.com/questions/39620345/…Ergi Nushi

1 Answers

1
votes

I solved this using kafkajs npm package.

NOTE: Refer above in question to connect a producer via console.

CODE:

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['192.168.1.172:9092', '192.168.1.172:9093', '192.168.1.172:9094']
})

const consumer = kafka.consumer({ groupId: 'test-group' })

const run = async () => {

  await consumer.connect()
  await consumer.subscribe({ topic: 'test', fromBeginning: true })

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString(),
      })
    },
  })
}

run().catch(console.error)

Single Terminal:

PS D:\checkout\javascript\sample projects\kafka> node .\kafkajs-impl.js
{"level":"ERROR","timestamp":"2019-12-24T03:19:02.613Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"192.168.1.172:9092","clientId":"my-app"}
{"level":"ERROR","timestamp":"2019-12-24T03:19:02.625Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":270}
{"level":"INFO","timestamp":"2019-12-24T03:19:02.964Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"test-group"}
{"level":"INFO","timestamp":"2019-12-24T03:19:03.015Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":true,"memberAssignment":{"test":[0,1,2]},"groupProtocol":"RoundRobinAssigner","duration":48}
{"level":"ERROR","timestamp":"2019-12-24T03:19:03.620Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 192.168.1.172:9092","broker":"192.168.1.172:9092","clientId":"my-app","stack":"Error: connect ECONNREFUSED 192.168.1.172:9092\n    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1191:14)"}
{ partition: 0, offset: '107', value: 'fgh' }
{ partition: 2, offset: '109', value: '' }
{ partition: 1, offset: '108', value: 'asdsa' }

2 simultaneous Terminals:

When I open another terminal and run the same command on newly opened terminal, I get the console output as below

PS D:\checkout\javascript\sample projects\kafka> node .\kafkajs-impl.js
{"level":"ERROR","timestamp":"2019-12-24T03:22:21.229Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"192.168.1.172:9092","clientId":"my-app"}
{"level":"ERROR","timestamp":"2019-12-24T03:22:21.236Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":257}
{"level":"INFO","timestamp":"2019-12-24T03:22:21.530Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"test-group"}
{"level":"ERROR","timestamp":"2019-12-24T03:22:22.236Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 192.168.1.172:9092","broker":"192.168.1.172:9092","clientId":"my-app","stack":"Error: connect ECONNREFUSED 192.168.1.172:9092\n    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1191:14)"}
{"level":"INFO","timestamp":"2019-12-24T03:22:26.026Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":false,"memberAssignment":{"test":[0,2]},"groupProtocol":"RoundRobinAssigner","duration":4495}

Now since the new consumer gets added via second terminal, the first consumer notifies on console with below logs.

{"level":"INFO","timestamp":"2019-12-24T03:22:26.023Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":true,"memberAssignment":{"test":[1]},"groupProtocol":"RoundRobinAssigner","duration":39}

3 Simultaneous Terminals:

While we keep previous terminals open, I now open 3rd terminal and belos is the console.

PS D:\checkout\javascript\sample projects\kafka> node .\kafkajs-impl.js
{"level":"ERROR","timestamp":"2019-12-24T03:28:07.516Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"192.168.1.172:9092","clientId":"my-app"}
{"level":"ERROR","timestamp":"2019-12-24T03:28:07.528Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":273}
{"level":"INFO","timestamp":"2019-12-24T03:28:07.865Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"test-group"}
{"level":"ERROR","timestamp":"2019-12-24T03:28:08.523Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 192.168.1.172:9092","broker":"192.168.1.172:9092","clientId":"my-app","stack":"Error: connect ECONNREFUSED 192.168.1.172:9092\n    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1191:14)"}
{"level":"INFO","timestamp":"2019-12-24T03:28:11.803Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-499da929-d351-4e59-94c9-88a18e97999d","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":false,"memberAssignment":{"test":[0]},"groupProtocol":"RoundRobinAssigner","duration":3937}

2nd terminals adds up re-balancing information as below:

{"level":"INFO","timestamp":"2019-12-24T03:22:26.026Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":false,"memberAssignment":{"test":[0,2]},"groupProtocol":"RoundRobinAssigner","duration":4495}
{"level":"ERROR","timestamp":"2019-12-24T03:28:11.720Z","logger":"kafkajs","message":"[Connection] Response Heartbeat(key: 12, version: 1)","broker":"192.168.1.172:9093","clientId":"my-app","error":"The group is rebalancing, so a rejoin is needed","correlationId":144,"size":10}
{"level":"ERROR","timestamp":"2019-12-24T03:28:11.725Z","logger":"kafkajs","message":"[Runner] The group is rebalancing, re-joining","groupId":"test-group","memberId":"my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13","error":"The group is rebalancing, so a rejoin is needed","retryCount":0,"retryTime":270}
{"level":"INFO","timestamp":"2019-12-24T03:28:11.801Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":false,"memberAssignment":{"test":[1]},"groupProtocol":"RoundRobinAssigner","duration":70}

1st terminal adds up the re-balancing information as below:

{"level":"ERROR","timestamp":"2019-12-24T03:28:11.750Z","logger":"kafkajs","message":"[Runner] The group is rebalancing, re-joining","groupId":"test-group","memberId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","error":"The group is rebalancing, so a rejoin is needed","retryCount":0,"retryTime":337}
{"level":"INFO","timestamp":"2019-12-24T03:28:11.797Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":true,"memberAssignment":{"test":[2]},"groupProtocol":"RoundRobinAssigner","duration":40}

ALL CONSUMERS UP AND RUNNING:

By bombarding producer events, below is the snapshot of consumers listening on individual partitions. Every consumer now listens on 1 particular partition of a topic which is awesome and now can be used on different machines for parallelism.

enter image description here

Below is the status for consumer to partition mapping by running command ./bin/kafka-consumer-groups.sh --describe --group test-group --bootstrap-server 192.168.1.172:9093

emgda@ubuntu:~/softwares/kafka_2.12-2.3.0$ ./bin/kafka-consumer-groups.sh --describe --group test-group  --bootstrap-server 192.168.1.172:9093

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                 HOST            CLIENT-ID
test-group      test            2          126             126             0               my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa /192.168.1.48   my-app
test-group      test            1          124             124             0               my-app-945d6f38-bcda-4f02-b1a2-325957db5846 /192.168.1.48   my-app
test-group      test            0          124             124             0               my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13 /192.168.1.48   my-app

SHUTTING DOWN 1 CONSUMER and RE_BALANCING EFFECT:

Now, If I shutdown 1 consumer, for example on 3rd terminal, then below is the snapshot of re-balancing that takes place between 2 consumers for 3 partition of a topic:

enter image description here

Below is the status for consumer to partition mapping by running command ./bin/kafka-consumer-groups.sh --describe --group test-group --bootstrap-server 192.168.1.172:9093

emgda@ubuntu:~/softwares/kafka_2.12-2.3.0$ ./bin/kafka-consumer-groups.sh --describe --group test-group  --bootstrap-server 192.168.1.172:9093

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                 HOST            CLIENT-ID
test-group      test            0          123             123             0               my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13 /192.168.1.48   my-app
test-group      test            2          125             125             0               my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13 /192.168.1.48   my-app
test-group      test            1          123             123             0               my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa /192.168.1.48   my-app