4
votes

Kafka 0.8.2.2.3 and zookeper both are running inside VM. I was able to run both producer and consumer within VM successfully using kafka-console-producer.sh and kafka-console-consumer.sh respectively. Even I was able to consume Kafka messages from host machine using kafka-console-consumer.sh. But when I tried to run the consumer using java from eclipse then zookeeper logs following error

2015-06-26 03:06:26,323 - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1007] - Closed socket connection for client /192.168.1.12:59549 (no session established for client)
2015-06-26 03:07:26,225 - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted socket connection from /192.168.1.12:59617
2015-06-26 03:07:26,226 - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@357] - caught end of stream exception
EndOfStreamException: Unable to read additional data from client sessionid 0x0, likely client has closed socket
    at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228)
    at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
    at java.lang.Thread.run(Thread.java:745)

Below is my Kafka consume code

package com.truckevent.producer;


import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;


public class KafkaConsumer {

    public static void main(String[] args) throws Exception {

        String group = "hello" ;


        Properties props = new Properties();
        props.put("zookeeper.connect", "192.168.1.12:2181");
        props.put("group.id", group);
        props.put("zookeeper.session.timeout.ms", "20000");
        props.put("zookeeper.sync.time.ms", "2030");
        props.put("auto.commit.interval.ms", "10000");
        props.put("auto.offset.reset", "smallest");

        ConsumerConfig cf = new ConsumerConfig(props) ;

        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(cf) ;

        String topic = "event" ;

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);


        KafkaStream<byte[],byte[]> stream = streams.get(0) ;

        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        int i = 1 ;
        while (it.hasNext()) {

            System.out.println(i + ": " + new String(it.next().message()));
            ++i;
        }
        consumer.shutdown(); 
    }
}

I am not sure why I am not able to consume messages from java code. Kafka is running on port 6667 and zookeeper on 2181.

1
Can you telnet to this IP address on this port?kisna
Did you figure out why ? I am having the same issue.chnet
Check firewall using PortQry.Pratiyush Kumar Singh

1 Answers

0
votes
  • Please check if zookeeper bound(netstat -lntp) to 0.0.0.0 or only to localhost(and then it will accept connections only from vm itself), this is rather new option(clientPortBindAddress, check if you've specified it, by default if you don't specify it it binds to all addresses)
  • It may be connected to your vm configuration. Eg. in my case where I'm using vagrant+virtual box I create private network between host and guest machines with config.vm.network :private_network, ip: 192.168.1.12
  • In addition - check if your consumer main doesn't throws some exception it might be connected and will provide some additional information(you are throwing exception to the highest level currently)