I am using following code to read my data for topic i.e. "sha-test2", but it is reading exactly alternative lines of code i.e. 10 out of 20 lines. But when I run console it is showing all 20 lines. i.e . bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic sha-test2 --from-beginning
What am I dong wrong ? your help is highly appreciated.
public class KafkaTestConsumer extends Thread {
//final static String clientId = "SimpleConsumerDemoClient";
final static String TOPIC = "sha-test2";
ConsumerConnector consumerConnector;
public static void main(String[] argv) throws
UnsupportedEncodingException {
KafkaTestConsumer helloKafkaConsumer = new KafkaTestConsumer();
helloKafkaConsumer.start();
}
public KafkaTestConsumer(){
Properties properties = new Properties();
properties.put("zookeeper.connect","172.23.32.35:2181");
properties.put("group.id","test-group");
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
consumerConnector =
Consumer.createJavaConsumerConnector(consumerConfig);
}
@Override
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(TOPIC, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumerConnector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(TOPIC).get(0);
System.out.println("consumerMap : \n " + consumerMap.toString() );
ConsumerIterator<byte[], byte[]> it = stream.iterator();
System.out.println("run started");
while(it.hasNext()){
System.out.println(new String(it.next().message()));
}
}
Thank you.
~Shyam