35
votes

I'm working with Apache Kafka and its Java client and I see that messages are load balanced across different Kafka Consumers belonging to the same group (i.e. sharing the same group id).

In my application I need all consumers to read all messages.

So I have several questions:

  • if I don't set any group id in the Consumer Properties, what group id will the Kafka Consumer be given?

  • Is there a single default value?

  • Does the client create a random value each time?

  • Do I need to create a different id for each consumer to be sure that each one receives all messages?

EDIT: Thank you for your answers.

You are correct: if one doesn't set the consumer group id, Kafka should complain.

However, I have found out that if the group id is null, the Java client sets it to the empty string "" to avoid problems. So apparently that is the default value I was looking for.

Surprising all my consumers, even if I don't set their groupIds (and so they are all with groupId == "") seem to receive all the messages the producer writes.

I still can't explain this: any suggestions?

6
It's probably linked to your number of partitions. How many partitions do you have and how many consumers do you use? In fact, when multiple consumers are subscribed to a topic and belong to the same consumer group, then each consumer in the group will receive messages from a different subset of the partitions in the topic.Maximilien Belinga
I use a single partition for each topic, so all consumers receive messages from the same partition :/Andrea Rossi

6 Answers

41
votes

if I don't set any group id in the Consumer Properties, what group id will the Kafka Consumer be given?

The kafka consumer will not have any consumer group. Instead you will get this error : The configured groupId is invalid

Is there a single default value?

Yes, you can see the consumer.properties file of kafka for reference. The default consumer group id is: group.id=test-consumer-group

Does the client create a random value each time?

No, groupId seems to be mandatory for Java client starting Kafka 0.9.0.x consumers. You can refer to this JIRA: https://issues.apache.org/jira/browse/KAFKA-2648

Do I need to create a different id for each consumer to be sure that each one receives all messages?

Yes, if all consumers use the same group id, messages in a topic are distributed among those consumers. In other words, each consumer will get a non-overlapping subset of the messages. Having more consumers in the same group increases the degree of parallelism and the overall throughput of consumption. On the other hand, if each consumer is in its own group, each consumer will get a full copy of all messages.

5
votes

Don't want to repeat other answers, but just to point out something: You don't actually need a consumer group to consume all messages. The Kafka Consumer API (assuming we're dealing with the Java one) has both a subscribe() and an assign() method. If you want all consumers to receive all messages without load balancing (which is what essentially consumer groups are for), you can just invoke assign() on all consumers, passing it all the partitions for the topic, optionally followed by seek() to set the offsets; that way your consumers will get all messages.

This way Kafka will not manage partition assignment and will not persist offsets — the consumer is responsible for all that. Depending on your use case, it may be a better approach comparing to having a consumer group per consumer.

1
votes

I have same problem. And took some time to research this issue. The project spring-cloud-stream will check whether you have set the group id for consumer. If not, spring-cloud-stream will create a random value as group id. Please refer the method createConsumerEndpoint in class KafkaMessageChannelBinder.

Image

0
votes

If don't set group.id,you will get error when consume topic data.

org.apache.kafka.common.errors.InvalidGroupIdException: The configured groupId is invalid
22:08:14.132 [testAuto-kafka-consumer-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining group 
22:08:14.132 [testAuto-kafka-consumer-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending JoinGroup ({group_id=,session_timeout=15000,rebalance_timeout=300000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=18 cap=18]}]}) to coordinator bogon:9092 (id: 2147483647 rack: null)
22:08:14.132 [testAuto-kafka-consumer-1] ERROR org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Attempt to join group  failed due to fatal error: The configured groupId is invalid
22:08:14.132 [testAuto-kafka-consumer-1] ERROR org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer - Container exception
0
votes

According to KIP-289 the default group.id has been "improved" and the default group.id, since kafka clients version 2.2.0, is null.

KIP-289: Improve the default group id behavior in KafkaConsumer.

It seems to me that when using assign you can forgo the group.id, leaving it to null, and there will be no offsets available.

enter image description here

0
votes

Check the groupId from- @KafkaListener(topics = "${kafka.topic}", groupId = "groupIdName")

Steps-> Go to Kafka folder

Open config folder

Open consumer.properties

Change the group id

group.id=groupIdName