0
votes

I am using following code to read my data for topic i.e. "sha-test2", but it is reading exactly alternative lines of code i.e. 10 out of 20 lines. But when I run console it is showing all 20 lines. i.e . bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic sha-test2 --from-beginning

What am I dong wrong ? your help is highly appreciated.

public class KafkaTestConsumer extends  Thread {
    //final static String clientId = "SimpleConsumerDemoClient";
    final static String TOPIC = "sha-test2";
    ConsumerConnector consumerConnector;

    public static void main(String[] argv) throws   
     UnsupportedEncodingException {
        KafkaTestConsumer helloKafkaConsumer = new KafkaTestConsumer();
        helloKafkaConsumer.start();
    }
    public KafkaTestConsumer(){
        Properties properties = new Properties();
        properties.put("zookeeper.connect","172.23.32.35:2181");
        properties.put("group.id","test-group");
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        consumerConnector = 
         Consumer.createJavaConsumerConnector(consumerConfig);
    }


    @Override
    public void run() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(TOPIC, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =  
         consumerConnector.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream =  consumerMap.get(TOPIC).get(0);
        System.out.println("consumerMap : \n " + consumerMap.toString() );
        ConsumerIterator<byte[], byte[]> it = stream.iterator();

       System.out.println("run started");
        while(it.hasNext()){
            System.out.println(new String(it.next().message()));
        }
}

Thank you.
~Shyam
2

2 Answers

2
votes

The problem is in this line:

topicCountMap.put(TOPIC, new Integer(1));

You tell the consumerConnector to create a single consumer thread for your topic, but the topic (evidently) has two partitions. Number of consumer threads in the "test-group" group should be equal to or greater than the number of partitions, otherwise some partitions won't be read by the group, which is precisely your case.

Please take a look at this example, where the number of threads is set via a command-line argument.

Alternatively, you can read the exact number of partitions from Zookeeper where their metadata is stored, under the /brokers/topics/your_topic_name/partitions node.

0
votes

Your code looks perfectly fine. This looks like an offset issue. High level consumer stores its offsets in zookeeper.

In your case this is what might have happened:- 1. You put 10 messages in kafka 2. You ran consumer code, it read all 10 messages successfully. Also the consumer updated the consumed offset to 10 in zookeeper. 3. You stop your consumer. 4. You put another 10 messages to kafka 5. You again start consumer code. It reads only last 10 messages not the 10 which were pushed earlier, because when you restart consumer it will check the zookeeper to find out from which offset to resume consuming.

Try rerunning your consumer with different group id OR try after deleting offset from zookeeper. It should work fine.

 properties.put("group.id","test-group420");