3
votes

For an example, say I have a topic with 4 partitions. I send 4k messages to this topic. Each partition gets 1k messages. Due to outside factors, 3 of the consumers process all 1k of their messages respectively. However, the 4th partition was only able to get through 200 messages, leaving 800 messages left to process. Is there a mechanism to allow me to "rebalance" the data in the topic to say give partition 1-3 200 of partition 4s data leaving all partitions with 200 messages a piece of process?

I am not looking for a way adding additional nodes to the consumer group and have kafka balance the partitions.

Added output from reassign partitions:

Current partition replica assignment

{
  "version": 1,
  "partitions": [
    {
      "topic": "MyTopic",
      "partition": 0,
      "replicas": [
        0
      ],
      "log_\ndirs": [
        "any"
      ]
    },
    {
      "topic": "MyTopic",
      "partition": 1,
      "replicas": [
        0
      ],
      "log_dirs": [
        "any"
      ]
    },
    {
      "topic": "MyTopic",
      "partition": 4,
      "replicas": [
        0
      ],
      "log_dirs": [
        "any"
      ]
    },
    {
      "topic": "MyTopic",
      "partition": 3,
      "replicas": [
        0
      ],
      "log_dirs": [
        "any"
      ]
    },
    {
      "topic": "MyTopic",
      "p\nartition": 2,
      "replicas": [
        0
      ],
      "log_dirs": [
        "any"
      ]
    },
    {
      "topic": "MyTopic",
      "partition": 5,
      "replicas": [
        0
      ],
      "log_dirs": [
        "any"
      ]
    }
  ]
}

Proposed partition reassignment configuration

{
  "version": 1,
  "partitions": [
    {
      "topic": "MyTopic",
      "partition": 3,
      "replicas": [
        0
      ],
      "log_ dirs": [
        "any"
      ]
    },
    {
      "topic": "MyTopic",
      "partition": 0,
      "replicas": [
        0
      ],
      "log_dirs": [
        "any"
      ]
    },
    {
      "topic": "MyTopic",
      "partition": 5,
      "replicas": [
        0
      ],
      "log_dirs": [
        "any"
      ]
    },
    {
      "topic": "MyTopic",
      "partition": 2,
      "replicas": [
        0
      ],
      "log_dirs": [
        "any"
      ]
    },
    {
      "topic": "MyTopic",
      "p artition": 4,
      "replicas": [
        0
      ],
      "log_dirs": [
        "any"
      ]
    },
    {
      "topic": "MyTopic",
      "partition": 1,
      "replicas": [
        0
      ],
      "log_dirs": [
        "any"
      ]
    }
  ]
}
2
may i know the type of storage you are using? I mean RAID/JBOD?wandermonk
@PhaniKumarYadavilli just machines with JBOD.Kevin Vasko
Can you generate and add output to the question ./kafka-reassign-partitions.sh \ --zookeeper list-of-zookeeper-nodes \ --broker-list '1,2,3' \ --topics-to-move-json-file topic.json \ --generatewandermonk
@PhaniKumarYadavilli I've only got 1 broker. I'm not wanting to distribute the load on multiple brokers but redistribute the messages that have already been generated within partitions on a topic. Based on that I'm not sure what that will end up doing. Feel free to correct me if I'm misunderstanding what that does though.Kevin Vasko
kafka-reassign-partitions.sh rebalances the load between partitions. You may give only one broker as per your cluster setup.wandermonk

2 Answers

5
votes

The partition is assigned when a message is produced. They are never automatically moved between partitions. In general, for each partition there can be multiple consumers (with different consumer group id) consuming at different paces so the broker can't move the messages between partitions based on the slowness of a consumer (group). There are a few things you can try though:

  • more partitions, hoping for a fairer distribution of load (you can have more partitions than consumers)
  • have producers explicitly set the partition on each message to produce a distribution between partitions that the consumers can better cope with
  • have consumers monitor their lag and actively unsubscribe from partitions when they fall behind so as to let other consumers pick up the load.
0
votes

Couple of things which you can do to improve the performance

  • Increase number of partitions
  • Increase the consumer groups which are consuming the partitions.

The first will rebalance the load on your partitions and the second will increase the parallelism on your partitions to consume messages quickly.

I hope this helps. You can refer to this link for more understanding

https://xyu.io/2016/02/29/balancing-kafka-on-jbod/

Kafka consumers are part of consumer groups. A group has one or more consumers in it. Each partition gets assigned to one consumer.

If you have more consumers than partitions, then some of your consumers will be idle. If you have more partitions than consumers, more than one partition may get assigned to a single consumer.

Whenever a new consumer joins, a rebalance gets initiated and the new consumer is assigned some partitions previously assigned to other consumers.

For example, if there are 20 partitions all being consumed by one consumer, and another consumer joins, there'll be a rebalance.

During rebalance, the consumer group "pauses".