7
votes

I have a topic with 10 partitions, 1 consumer group with 4 consumers and worker size is 3.

I could see there is an uneven distribution of messages in the partitions, One partition is having so much data and another one is free.

How can I make my producer to evenly distribute the load into all the partitions, so that all partitions are being utilized properly?

7
I need to clarify some things. Are you using a custom partition strategy or the default one? How , do you know there is a uneven distribution of messages.Indraneel Bende
@IndraneelBende When I describe my topic, it shows the lag through which I can confirm that some partitions are having a lag of more than 1lac and some are having 0 lag that means there is no data in the partition. Not sure about the strategy but this is something I can see in the code : this.partitionerClass = props.getString("partitioner.class", "kafka.producer.DefaultPartitioner");Pacifist
If you are using default partitioner , then messages are produced in a round-robin fashion across the different partitions . How are you calculating this lag?Indraneel Bende
Lag=LOG END OFFSET - CURRENT OFFSET Yes, that's what kakfa documentation says but not getting why one partition is overloaded and another one is free.Pacifist

7 Answers

12
votes

According to the JavaDoc comment in the DefaultPartitioner class itself, the default partitioning strategy is:

  • If a partition is specified in the record, use it.
  • If no partition is specified but a key is present choose a partition based on a hash of the key.
  • If no partition or key is present choose a partition in a round-robin fashion.

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java

So here are two possible reasons that may be causing the uneven distribution, depending on whether you are specifying a key while producing the message or not:

  • If you are specifying a key and you are getting an uneven distribution using the DefaultPartitioner, the most apparent explanation would be that you are specifying the same key multiple times.

  • If you are not specifying a key and using the DefaultPartitioner, a non-obvious behavior could be happening. According to the above you would expect round-robin distribution of messages, but this is not necessarily the case. An optimization introduced in 0.8.0 could be causing the same partition to be used. Check this link for a more detailed explanation: https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whyisdatanotevenlydistributedamongpartitionswhenapartitioningkeyisnotspecified? .

1
votes

Seems like your problem is uneven consumption of messages rather than uneven producing of messages to Kafka topic. In other words, your amount of reading threads doesn't match amount of partitions you have (they do not need to match 1:1 though, only be the same amout of partitions to read from per each consumer thread).

See short explanation for more details.

1
votes

Instead of going for the default partitioner class you can assign the producer with a partition number so that message directly goes to the specified partition,

 ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName, partitionNumber,key, value);
0
votes

You can make use of the key parameter of the producer record. Here is a thing that for a specific key the data goes in to the same partition always now, I don’t know the structure of your producer record but as you said you have 10 partition then you can use simply n%10 as your producer record key. Where n is 0 to 9 now your for record 0 key will be 0 and then kafka will generate a hash key and put it in some partition say partition 0, and for record 1 it will be one and then it will go into the 1st partition and so on. This way you will able to apply round robin on your producer record your key will be independent from the fields in your record so you can have a variable n and key as n%10.

Or you can specify the partition in your producer record. So either you use the key or the partition field of the producer record.

0
votes

If you have defined partitioner from record let's say in Kafka key is string and value is student Pojo.

In student Pojo let's say based on student country field, I want to go in a specific partition. Imagine that there is 10 partitions in a topic and for example, in value, "India" is a country and based on "India" we got partition number 5.

Whenever country is "India", Kafka will allocate the 5 number partition and that record goes to the partition number 5 always (if the partition has not changed).

Let's say that in your pipeline there are lots of records which are coming and have a country "India", all those records will go to partition number 5, and you will see uneven distribution in Kafka partition.

0
votes

In my case, I used the default partitioner but still had much much more records in one partition than in others. The problem was I unexpectedly had many records with the same key. Check your keys!

0
votes

As I was unable to resolve this with Faust, the approach I am using is to implement the 'round-robin' distribution myself.

I iterate over my records to produce and do for example:

for index, message in enumerate(messages):
    topic.send(message, partition=index % num_partitions)

I.e. bound my index to within the range of partitions I have.

There could still be unevenness - consider you repeatedly run this but your number of records is less than your num_partitions - then your first partitions will keep getting the major share of messages. You can avoid this issue by adding a random offset:

import random
initial_partition = random.randrange(0, num_partitions)
for index, message in enumerate(messages):
    topic.send(message, partition=(initial_partition + index) % num_partitions)