1
votes

I am using Kafka. This is my code , where i want to send messages to kafka server,Topic name is "west" with message "message1".I'm not getting any error though i haven't seen my sent messages in the topic is there anything wrong here?

class SimpleProducer {

  public static void main(String[] args) throws Exception{       
    Properties props = new Properties();
    props.put("bootstrap.servers","172.xxxxxxxxx:9092");
    props.put("serializer.class", "kafka.serializer.DefaultEncoder");
    props.put("acks", "1");
    props.put("retries", 1);
    props.put("batch.size", 16384);
    props.put("linger.ms", 0);
    props.put("client.id", "foo");
    props.put("buffer.memory", 33554432);
    props.put("timeout.ms", "500");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "500"); 
    props.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100");

    System.out.println("ready to send msg");

    try {
        Producer<String, String> producer = new KafkaProducer<String, String>(props);

        producer.send(new ProducerRecord<String, String>("west","message1"));

        System.out.println("Message sent successfully");
        producer.close();
    }
    catch(Exception e)
    {
        System.out.println("Messgae doesn't sent successfully");
        e.printStackTrace();

    }
  }
}
2
How are you checking for messages written and committed to the broker?Chris Gerken
As of now i'm sending the message to some particular topic and checking it with the below command :- bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic-name --from-beginningSagar
I'm writing a custom producer to get my messages from Java to the topic. Calling the shown main does not result into a message on the Kafka topic nor does it print any error message. Does anybody have an idea why the message does not arrive in my topic?Sagar

2 Answers

0
votes

The API you used to send the message is asynchronous. Use the form of send() which has two arguments. The second argument is a Callback which you can use to see if the send really worked or if there was an error somewhere.

-1
votes
   producer.send(yourRecord,
                 new Callback() {
                     public void onCompletion(RecordMetadata metadata, Exception e) {
                         if(e != null) {
                            e.printStackTrace();
                         } else {
                            System.out.println("The offset of the record we just sent is: " + metadata.offset());
                         }
                     }
                 });