2
votes

I am writing Kafka client producer as:

public class BasicProducerExample {
   public static void main(String[] args){
       Properties props = new Properties();
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
       props.put(ProducerConfig.ACKS_CONFIG, "all");
       props.put(ProducerConfig.RETRIES_CONFIG, 0);
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
       //props.put(ProducerConfig.
       props.put("batch.size","16384");// maximum size of message 

       Producer<String, String> producer = new KafkaProducer<String, String>(props);
       TestCallback callback = new TestCallback();
       Random rnd = new Random();
       for (long i = 0; i < 2 ; i++) {
           //ProducerRecord<String, String> data = new ProducerRecord<String, String>("dke", "key-" + i, "message-"+i );
           //Topci and Message
           ProducerRecord<String, String> data = new ProducerRecord<String, String>("dke", ""+i);
           producer.send(data, callback);
       }

       producer.close();
   }
   private static class TestCallback implements Callback {
       @Override
       public void onCompletion(RecordMetadata recordMetadata, Exception e) {
           if (e != null) {
               System.out.println("Error while producing message to topic :" + recordMetadata);
               e.printStackTrace();
           } else {
               String message = String.format("sent message to topic:%s partition:%s  offset:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
               System.out.println(message);
           }
       }
   }
}

OUTPUT: Error while producing message to topic :null org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

NOTE: Broker port: localhost:6667 is working.

2
how did u fix this problem ?Mikhail

2 Answers

1
votes

In your property for BOOTSTRAP_SERVERS_CONFIG, try changing the port number to 6667.

Thanks.

-- Hiren

1
votes

I use Apache Kafka on a Hortonworks (HDP 2.X release) installation. The error message encountered means that Kafka producer was not able to push the data to the segment log file. From a command-line console, that would mean 2 things :

  1. You are using incorrect port for the brokers
  2. Your listener config in server.properties are not working

If you encounter the error message while writing via scala api, additionally check connection to kafka cluster using telnet <cluster-host> <broker-port>

NOTE: If you are using scala api to create topic, it takes sometime for the brokers to know about the newly created topic. So, immediately after topic creation, the producers might fail with the error Failed to update metadata after 60000 ms.

I did the following checks in order to resolve this issue:

The first difference once I check via Ambari is that Kafka brokers listen on port 6667 on HDP 2.x (apache kafka uses 9092).

listeners=PLAINTEXT://localhost:6667

Next, use the ip instead of localhost. I executed netstat -na | grep 6667

tcp        0      0 192.30.1.5:6667        0.0.0.0:*               LISTEN     
tcp        1      0 192.30.1.5:52242       192.30.1.5:6667        CLOSE_WAIT 
tcp        0      0 192.30.1.5:54454       192.30.1.5:6667        TIME_WAIT

So, I modified the producer call to user the IP and not localhost:

./kafka-console-producer.sh --broker-list 192.30.1.5:6667 --topic rdl_test_2

To monitor if you have new records being written, monitor the /kafka-logs folder.

cd /kafka-logs/<topic name>/
ls -lart
-rw-r--r--.  1 kafka hadoop        0 Feb 10 07:24 00000000000000000000.log
-rw-r--r--.  1 kafka hadoop 10485756 Feb 10 07:24 00000000000000000000.timeindex
-rw-r--r--.  1 kafka hadoop 10485760 Feb 10 07:24 00000000000000000000.index

Once, the producer successfully writes, the segment log-file 00000000000000000000.log will grow in size.

See the size below:

-rw-r--r--. 1 kafka hadoop 10485760 Feb 10 07:24 00000000000000000000.index
-rw-r--r--. 1 kafka hadoop       **45** Feb 10 09:16 00000000000000000000.log
-rw-r--r--. 1 kafka hadoop 10485756 Feb 10 07:24 00000000000000000000.timeindex

At this point, you can run the consumer-console.sh:

./kafka-console-consumer.sh --bootstrap-server 192.30.1.5:6667 --topic rdl_test_2 --from-beginning
response is hello world

After this step, if you want to produce messages via the Scala API's , then change the listeners value(from localhost to a public IP) and restart Kafka brokers via Ambari:

listeners=PLAINTEXT://192.30.1.5:6667 

A Sample producer will be as follows:

package com.scalakafka.sample
import java.util.Properties
import java.util.concurrent.TimeUnit

import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
import org.apache.kafka.common.serialization.{StringSerializer, StringDeserializer}

class SampleKafkaProducer {
  case class KafkaProducerConfigs(brokerList: String = "192.30.1.5:6667") {
    val properties = new Properties()
    val batchsize :java.lang.Integer = 1

    properties.put("bootstrap.servers", brokerList)
    properties.put("key.serializer", classOf[StringSerializer])
    properties.put("value.serializer", classOf[StringSerializer])
    //    properties.put("serializer.class", classOf[StringDeserializer])
        properties.put("batch.size", batchsize)
    //    properties.put("linger.ms", 1)
    //    properties.put("buffer.memory", 33554432)
  }

  val producer = new KafkaProducer[String, String](KafkaProducerConfigs().properties)

  def produce(topic: String, messages: Iterable[String]): Unit = {
    messages.foreach { m =>
      println(s"Sending $topic and message is $m")
      val result = producer.send(new ProducerRecord(topic, m)).get()
      println(s"the write status is ${result}")
    }
    producer.flush()
    producer.close(10L, TimeUnit.MILLISECONDS)
  }
}

Hope this helps someone.