3
votes

I am new to Kafka and going through the official documentation that is available.

On my local system i have started a single kafka instance alongwith the zookeeper. Zookeper and kafka server both are running on default ports.

I have created a topic "test" with replication factor as 1 since i just have one single instance of kafka up and running.

Alongwith it i have created two partitions.

I have two consumers subscribed to this queue within the same consumer group.

For now i have started the consumers using command prompt on windows machine.

When i start the producer from command prompt and publish messages to the topic everything works fine. Kafka pushes the messages using round robin to both the partitions and each of the consumers receive messages alternatively since each of them is listening to separate partitions.

But when i create a producer using java kafka-client jar, even though i use different keys for messages, producer pushes all the messages to the same partition there by all the messages are received on the same consumer.

The partition is not static as well it keeps changing everytime i run my producer.

I have tried the same scenario with a producer started from the command prompt with exactly same configuration as i provided to kafka-client producer using java code. Command prompt producer seems to be working fine but code producer is pushing all the messages to same partition.

I have tried changing the key of certain messages hoping broker would send it to different parition since its mentioned in the documentation that broker routes messages using the key of message.

public class KafkaProducerParallel {


public static void main(String[] args) throws InterruptedException, 
ExecutionException {

    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
    properties.put(ProducerConfig.CLIENT_ID_CONFIG, "parallelism- 
 producer");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
 StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
LongSerializer.class);


    Producer<String, Long> parallelProducer = new KafkaProducer<> 
(properties);

    for(long i=0;i<100;i++) {

        ProducerRecord<String, Long> producerRecord;

        if(i<50) {
            producerRecord = new ProducerRecord<String, 
 Long>("second-topic", "Amoeba", i);
        }else {
            producerRecord = new ProducerRecord<String, 
 Long>("second-topic", "Bacteria", i);
        }

        RecordMetadata recordMetadata = 
  parallelProducer.send(producerRecord).get();

        System.out.printf("Sent record : with key %s and value 
 %d to partition %s", producerRecord.key(), producerRecord.value(), 
 recordMetadata.partition());
        System.out.println();
    }

    parallelProducer.close();


}

}

As per the documentation kafka broker decides which partition to put a particular message in by using the key (producing a hash of key). I am changing the key of my records after an interval but still the messages are going to the same partition everytime.

Sample console output of code :

  Sent record : with key Amoeba and value 0 to partition 1
  Sent record : with key Amoeba and value 1 to partition 1
  Sent record : with key Amoeba and value 2 to partition 1
  Sent record : with key Amoeba and value 3 to partition 1
  Sent record : with key Amoeba and value 4 to partition 1
  Sent record : with key Amoeba and value 5 to partition 1
  Sent record : with key Amoeba and value 6 to partition 1
  Sent record : with key Amoeba and value 7 to partition 1
  Sent record : with key Amoeba and value 8 to partition 1
  Sent record : with key Amoeba and value 9 to partition 1
  Sent record : with key Amoeba and value 10 to partition 1
  Sent record : with key Amoeba and value 11 to partition 1
  Sent record : with key Amoeba and value 12 to partition 1
  Sent record : with key Amoeba and value 13 to partition 1

 Sent record : with key Bacteria and value 87 to partition 1
 Sent record : with key Bacteria and value 88 to partition 1
 Sent record : with key Bacteria and value 89 to partition 1
 Sent record : with key Bacteria and value 90 to partition 1
 Sent record : with key Bacteria and value 91 to partition 1
 Sent record : with key Bacteria and value 92 to partition 1
 Sent record : with key Bacteria and value 93 to partition 1
 Sent record : with key Bacteria and value 94 to partition 1
 Sent record : with key Bacteria and value 95 to partition 1
 Sent record : with key Bacteria and value 96 to partition 1
 Sent record : with key Bacteria and value 97 to partition 1
 Sent record : with key Bacteria and value 98 to partition 1
 Sent record : with key Bacteria and value 99 to partition 1
3
can you also share error messageharkesh kumar
@harkeshkumar There is no error in console. I am just curious why is it always sending the records to the same partitionANKIT SRIVASTAVA
@harkeshkumar I have updated the question with sample console output of codeANKIT SRIVASTAVA
Do you have only these two keys? The chances of them going to the same partition are 50%. Try 50 different keys, and they should scatter randomly.Thilo
if you don't have any key, it will randomly distribute between partitions. If you have a key, it will use the key to assign partitions.Thilo

3 Answers

2
votes

Everything works as expected.

In your particular case Partitioner, that is used by KafkaProducer (to determine the partition), calculate same partition for both keys: Amoeba, Bacteria. By default KafkaProducer uses org.apache.kafka.clients.producer.internals.DefaultPartitioner.

Suggestion: Change the key or increase number of partitions.

Notice: Producer decides to which partition put the message, not Broker.

1
votes

Change the code from Producer<String, String> producer = new KafkaProducer<String, String> to :

KafkaProducer<String, String> producer = new KafkaProducer<String, String>

By default the interface implementation places data into same partition. So use KafkaProducer instead of simple Producer

0
votes

From version 2.4 and later of Apache Kafka, the default partitioning strategy has been changed for records with a null key whereby sticky partitioning is the default behavior.

The previous round robin strategy meant that records with a null key would be split across partitions, the new sticky partitioning strategy sends records to the same partition until a partition's batch is "complete" (this is defined by batch.size or linger.ms)

Check out this article for more info: Improvements with Sticky Partitioner