0
votes

I have a topic in kafka as internal. I have created the topic using the below command

/opt/kafka/bin/kafka-topics.sh 
     --create --zookeeper zookeeper:2181 
     --replication-factor 3 -partitions 6 
     --topic internal

I need to consume all the message in three different node servers. So I am using kafka-node module as a consumer group with different consumer name. I have created a consumer group name called group1, group2, group3.

Everything working fine, I can consume all the messages in all consumers.

But when any broker is down, the consumer is not getting any message. When I list all the consumer groups, it does not showing the specific group ID.

(e.g) If nodeserver 1 is down there is no group available in broker called group1

Even if I restart the node server, it does not create any group in broker and not consuming any messages in the respective node server. But when the broker is up, and the node server is restarted, it is creating a group in broker and the node server can receive message.

consumer.js

const options = {
  kafkaHost: process.env.KAFKA_HOST, 
  groupId: group_id, //group1 (or) group2 (or) group3
  autoCommit: true,
  sessionTimeout: 15000,
  protocol: ['roundrobin'],
  fromOffset: 'latest',
  outOfRangeOffset: 'earliest',
  migrateHLC: false,
  migrateRolling: true,
  fetchMaxBytes: 1024 * 1024 * 10,
  commitOffsetsOnFirstJoin: true,
  onRebalance: (isAlreadyMember, callback) => {
      log.info({"ALREADY_MEMBER_isAlreadyMember": isAlreadyMember});
      callback();
  }
};

const consumerGroup = new ConsumerGroup(options, process.env.KAFKA_TOPIC);

// On receiving message
consumerGroup.on("message", handMessage); //handMessage is where the message has been handled

// On error receiving message
consumerGroup.on('error', function(err) {
    log.debug({"type": "KAFKA_CONSUMER_ERROR", "msg": err});
});
// On error receiving message
consumerGroup.on('offsetOutOfRange', function(err) {
    log.debug({"type": "KAFKA_CONSUMER_RANGE_ERROR", "msg": err});
});

UPDATE - 1

Even if I updated offsets.topic.replication.factor as 2 or 3, I am having the same issue. When I any broker is down, the respective node server is not consuming the message. And also when I show list of groups in broker, it shows only group2 and group3. But the group1 is not there when the broker1 is down. Even if I restart the node consumer, the group1 is not getting created.

server.properties

broker.id=1
listeners=INSIDE://:9092,OUTSIDE://:9094
advertised.listeners=INSIDE://:9092,OUTSIDE://:9094
listener.security.protocol.map=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/kafka/kafka-logs-d3f14c9ddf0a
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
zookeeper.connection.timeout.ms=16000
group.initial.rebalance.delay.ms=0
inter.broker.listener.name=INSIDE
advertised.port=9094
port=9092
auto.create.topics.enable=false

UPDATE - 2

When the broker is down, the group coordinator is getting deleted and it is not automatically re-electing.

Can you guys tell me what I did wrong? Or is there anything else I need to update?

2
Please show your Kafka server properties file... What is the replication factor of the consumer offsets topic?OneCricketeer
Updated server.properties file. Please check.Praveen Srinivasan
Show the description of __consumer_offsets topic as wellOneCricketeer
The __consumer_offsets topic is having replication factor of 1 onlyPraveen Srinivasan
Exactly - As I mentioned in my answer, you need to manually increase it for your cluster to be highly availableOneCricketeer

2 Answers

1
votes

Assuming this is at-least Kafka 1.x, some changes are required regarding the HA for internal Kafka topics. Consider the following snippet from server.properties. The default values for replication are set to 1. In your case, for 3 brokers, setting these to 2 might be a good place to start.

# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1

Addition

As per my understanding, each consumer-group has its group coordinator. So, if there are multiple groups consuming from a topic, there can be multiple coordinators (different brokers) for that topic. A broker can act as a group coordinator for multiple consumer groups. But for a consumer-group there's only one broker which acts as coordinator. For a particular consumer group we can check which broker is the coordinator using this command:

./kafka-consumer-groups.sh --bootstrap-server <broker-host>:9092 --describe --group <consumer-group> --state 

If a coordinator fails, some other broker is chosen as the coordinator. The failover strategy is explained in detail here in section 10.

1
votes

Even if I updated offsets.topic.replication.factor as 2 or 3, I am having the same issue. When I any broker is down, the respective node server is not consuming the message

After the offsets topic is created, changing this property doesn't do anything.

If it was set to one, then you now need to manually increase it