If a valid partition number is specified, that partition will be used when sending the record.
If no partition is specified but a key is present a partition will be chosen using a hash of the key (DefaultPartitioner
- see below for more details).
If neither key nor partition is present a partition will be assigned in a round-robin fashion
Kafka makes use of the DefaultPartitioner
(org.apache.kafka.clients.producer.internals.DefaultPartitioner
) in order to distribute messages across topic partitions:
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
Essentially, the DefaultPartitioner
makes use of MurmurHash, a non-cryptographic hash function which is usually used for hash-based lookup. This hash is then used in a modulo operation (% numPartitions
) in order to ensure that the returned partition is within the range [0, N]
where N
is the number of partitions of the topic.