5
votes

I have a conceptual issue with Kafka.

We have many machines acting as consumers on one topic with many partitions. These machines run on different hardware setups and there will be consumers with higher throughput than others.

Now there's a direct correlation between a consumer and one or more partitions.

How can I prevent from one partition (slow consumer) accumulating unconsumed messages faster than other partitions (fast consumer), thus bringing an imbalance into the partitions.

One idea I had was to just force a rebalance on a regular basis but it appears that this will usually assign the same consumers to the same queues again. If they'd be randomly reassigned that would solve my issue.

I'd appreciate any hint on this.

Thank you, greetings from Berlin, Dennis

1
In "How can .. partitions." question, Looks like its not the case that one partition is accumulating less data than other partition within same consumer. So if you have more partitions in your fast consumer, that looks fine. The issue seems when you have one partition and slower machines in a consumer, It polls lot of messages but there is not enough power in machine to process those all? If this is the issue, limit the messages done by poll by making change in "max.partition.fetch.bytes" or "max.poll.records"property file - Paresh
if your run a single consumer in a "slow" machine, can it handle the load of a single partition? - Luciano Afranllie

1 Answers

3
votes

You don't have to use the default partitioner for your producers nor do you have to use dynamic partition assignment in your consumers. You can have a pool of high speed partitions, and a separate pool of low speed partitions and manually (or randomly) assign message and consumers to each pool of partitions.

"...instead of subscribing to the topic using subscribe, you just call assign(Collection) with the full list of partitions that you want to consume.

 String topic = "foo";
 TopicPartition partition0 = new TopicPartition(topic, 0);
 TopicPartition partition1 = new TopicPartition(topic, 1);
 consumer.assign(Arrays.asList(partition0, partition1)); 

"