0
votes

Using kafka version 2.11-0.11.0.3 to publish 10,000 messages (total size of all messages are 10MB), there will be 2 consumers (with same group-id) to consume the message as a parallel processing. While consuming, same message was consumed by both the consumers.

Below errors/warning were throws by kafka

WARN: This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

INFO: Attempt to heartbeat failed since group is rebalancing

INFO: Sending LeaveGroup request to coordinator

WARN: Synchronous auto-commit of offsets {ingest-data-1=OffsetAndMetadata{offset=5506, leaderEpoch=null, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

Below configurations were provided to kafka

server.properties

max.poll.interval.ms=30000
group.initial.rebalance.delay.ms=0
group.max.session.timeout.ms=120000
group.min.session.timeout.ms=6000

consumer.properties

session.timeout.ms=30000 
request.timeout.ms=40000

What should have changed to resolve the multiple consumptions?

1

1 Answers

0
votes

Are your consumers in the same group? If yes you will have multiple consumption if a consumer leaves/dies/timeouts without having committed some messages it has processed.

If all your messages are consumed by both consumers you probably have not set the same group id for them.

More info:

So you have set the same group id for all consumers, good. You are in the situation where the cluster/broker thinks that a consumer died and therefore rebalances the load to another one. This other one will start consuming where the last commit was done.

So lets say consumer C_A read offsets up to 100 from partition P_1 then processed them then committed '100' then read offsets up to 200 then processed them but could not commit because the broker considered C_A as dead.

The broker reassigns partition P_1 to consumer C_B which will start from the last commit for the group, which is 100, will read up to 200, process and commit 200.

So your question is how to avoid that the consumer is considered as dead (I assume it is not dead)?

The answer is already in the yellow WARN message in your question: you can tell your consumer to consume less messages (max.poll.records) in one poll to reduce the processing time between two polls to the broker AND/OR you can increase the max.poll.interval.ms telling the broker to wait longer before considering your consumer as dead...