0
votes

I'm starting on Apache Kakfa with a simple Producer, Consumer app in Java. I'm using kafka-clients version 0.10.0.1 and running it on a Mac.

I created a topic named replicated_topic_partitioned with 3 partitions and with replication factor as 3.

I started the zookeeper at port 2181. I started three brokers with id 1, 2 and 3 on ports 9092, 9093 and 9094 respectively.

Here's the output of the describe command

kafka_2.12-2.3.0/bin/kafka-topics.sh --describe --topic replicated_topic_partitioned --bootstrap-server localhost:9092    
Topic:replicated_topic_partitioned    PartitionCount:3    ReplicationFactor:3    Configs:segment.bytes=1073741824
     Topic: replicated_topic_partitioned    Partition: 0    Leader: 3    Replicas: 3,1,2    Isr: 3,1,2
     Topic: replicated_topic_partitioned    Partition: 1    Leader: 1    Replicas: 1,2,3    Isr: 1,2,3
     Topic: replicated_topic_partitioned    Partition: 2    Leader: 2    Replicas: 2,3,1    Isr: 2,3,1

I wrote a simple producer and a consumer code. The producer ran successfully and published the messages. But when I start the consumer, the poll call just waits indefinitely. On debugging, I found that it keeps on looping at the awaitMetadataUpdate method on the ConsumerNetworkClient.

Here are the code for Producer and Consumer

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> myProducer = new KafkaProducer<>(properties);
DateFormat dtFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss:SSS");
String topic = "replicated_topic_partitioned";

int numberOfRecords = 10;
try {
    for (int i = 0; i < numberOfRecords; i++) {
       String message = String.format("Message: %s  sent at %s", Integer.toString(i), dtFormat.format(new Date()));
       System.out.println("Sending " + message);
       myProducer.send(new ProducerRecord<String, String>(topic, message));

    }
} catch (Exception e) {
    e.printStackTrace();
} finally {
    myProducer.close();
}

Consumer.java

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
properties.put("group.id", UUID.randomUUID().toString());
properties.put("auto.offset.reset", "earliest");

KafkaConsumer<String, String> myConsumer = new KafkaConsumer<>(properties);

String topic = "replicated_topic_partitioned";
myConsumer.subscribe(Collections.singletonList(topic));

try {
    while (true){
        ConsumerRecords<String, String> records = myConsumer.poll(1000);
         printRecords(records);
    }
 } finally {
     myConsumer.close();
 }

Adding some key-fields from server.properties

broker.id=1 
host.name=localhost
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

log.dirs=/tmp/kafka-logs-1
num.partitions=1
num.recovery.threads.per.data.dir=1

transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

zookeeper.connection.timeout.ms=6000

group.initial.rebalance.delay.ms=0

The server.properties for the other two brokers was a replica of the above with broker.id, the port and thelog.dirs changed.

This did not work for me: Kafka 0.9.0.1 Java Consumer stuck in awaitMetadataUpdate()


But, if I start the consumer from the command line passing a partition, it successfully reads the messages for that partition. But it does not receive any message when just a topic is specified.

Works:

kafka_2.12-2.3.0/bin/kafka-console-consumer.sh --topic replicated_topic_partitioned --bootstrap-server localhost:9092 
     --from-beginning --partition 1

Does not work:

kafka_2.12-2.3.0/bin/kafka-console-consumer.sh --topic replicated_topic_partitioned --bootstrap-server localhost:9092 
    --from-beginning 

NOTE: The above consumer works perfectly for a topic with replication factor equals 1.

Question:

  1. Why does the Java Producer not read any message for topic with replication factor more than one (even when assigning it to a partition) (like myConsumer.assign(Collections.singletonList(new TopicPartition(topic, 2))?

  2. Why does the console consumer read message only when passed a partition (again works for a topic with replication factor of one)

2
" But it does not work when just a topic is specified." - By "does not work" what do you mean? Does it fail?Giorgos Myrianthous
@GiorgosMyrianthous It does not receive any message. Updated my question clarifying thisuser7
Do you know how many messages does each of the partitions have?Giorgos Myrianthous
@GiorgosMyrianthous I sent a total of 10 messages. They got distributed to the partitions - I'm saying this by the console consumer output when passed --partition 0,1 and 2. Why would the number of messages per partition be of any interest here?user7
how do you know produce succeeds? to you examine the returned future? do you provide a callback? do you call flush() on the producer ?radai

2 Answers

1
votes

so, youre sending 10 records, but all 10 records have the SAME key:

for (int i = 0; i < numberOfRecords; i++) {
   String message = String.format("Message: %s  sent at %s", Integer.toString(i), dtFormat.format(new Date()));
   System.out.println("Sending " + message);
   myProducer.send(new ProducerRecord<String, String>(topic, message)); <--- KEY=topic
}

unless told otherwise (by setting a partition directly on the ProducerRecord) the partition into which a record is delivered is determine by something like:

partition = murmur2(serialize(key)) % numPartitions

so same key means same partition.

have you tried searching for your 10 records on partitions 0 and 2 maybe?

if you want a better "spread" of records amongst partitions, either use a null key (you'd get round robin) or a variable key.

0
votes

Disclaimer: This is not an answer.

The Java consumer is now working as expected. I did not do any change to the code or the configuration. The only thing I did was to restart my Mac. This caused the kafka-logs folder (and the zookeeper folder too I guess) to be deleted.

I re-created the topic (with the same command - 3 partitions, replication factor of 3). Then re-started the brokers with the same configuration - no advertised.host.name or advertised.port config.

So, recreation of the kafka-logs and topics remediated something that was causing an issue earlier.


My only suspect is a non-properly terminated consumer. I ran the consumer code without the close call on the consumer in the finally block initially. I also had the same group.id. Maybe, all 3 partitions were assigned to consumers that weren't properly terminated or closed. This is just a guess..

But even calling myConsumer.position(new TopicPartition(topic, 2)) did not return a response earlier when I assigned the consumer to a partition. It was looping in the same awaitMetadataUpdate method.