27
votes

I did fresh installation of Apache Kafka 0.10.1.0.

I was able to send / receive messages on command prompt.

While using Producer / Consumer Java Example, I am not able to know group.id parameter on Consumer Example.

Let me know on how to fix this issue.

Below is Consumer Example I had used:

public static void main(String[] args) {
             Properties props = new Properties();
             props.put("bootstrap.servers", "localhost:9092");
             props.put("group.id", "my-topic");
             props.put("enable.auto.commit", "true");
             props.put("auto.commit.interval.ms", "1000");
             props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
             props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
             KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
             try {
                 consumer.subscribe(Arrays.asList("my-topic"));

                     ConsumerRecords<String, String> records = consumer.poll(100);
                     System.err.println("records size=>"+records.count());
                     for (ConsumerRecord<String, String> record : records) 
                         System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());



              }
             catch (Exception ex){
                 ex.printStackTrace();
             }
            finally {
                 consumer.close();
            }
        }

After running the command for consumer, I can see the messages (on the console) posted by producer. But unable to see the messages from java program

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my-topic --from-beginning

6
If you run your java consumer and produce some messages AFTER launching it, you still don't see any message being consumed?Luciano Afranllie
yes, i am getting message on console "records size=> 0"Ankit
You can get the value of group.id for your kafka cluster by looking into $KAFKA_HOME/config/consumer.properties. There you can see the line #consumer group id. Use this value and your code will work. You can group multiple consumers to same group by giving same value of group.id in this file.vindev
I am not sure whether you have got the answer. But, my assumption is that you may be running the code in eclipse and in one window you are running producer. But without stopping producer, you are trying to start consumer hence you may not be able to see the records in IDE. But you can see those reocrds on console. Though this is a trivial thing still needs to discuss it. This is my assumption. Please correct me if I am incorrect.anshuman sharma
I am getting error : java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.Consumer.subscribe(Ljava/util/Collection;)VShrJoshi

6 Answers

39
votes

Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.

If all the consumer instances have the same consumer group, then the records will effectively be load balanced over the consumer instances.

If all the consumer instances have different consumer groups, then each record will be broadcast to all the consumer processes.

The group.id is a string that uniquely identifies the group of consumer processes to which this consumer belongs.

(Kafka intro)

1
votes

Here are some test results on partition and consumer property group.id

 Properties props = new Properties();
  //set all other properties as required
  props.put("group.id", "ConsumerGroup1");
  props.put("max.poll.records", "1");
  KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

consumer.group id is to load balance the produced data (if the group.id is different for each consumer, each consumer will get the copy of data)

if partition=1 and total consumers count = 2, only one out of two active consumer will get data

if partition=2 and total consumers count = 2, each of the two active consumers evenly get data

if partition=3 and total consumers count = 2, each of the two active consumers will get data. one consumer gets data from 2 partitions and other gets data from 1 partition.

if partition=3 and total consumers count = 3, each of the three active consumers evenly gets data.

0
votes

In the code you provided you just wait for data once for 100ms. You should receive the data in a loop or wait for longer period of time (you will only get one portion of data in this case). As for 'group.id' it the case you run consumer from console it gets random 'group.id'.

0
votes

Since no offset was provided, the java client will wait for new messages but will not show existing messages - this is as expected. If one intends to read all the messages already in the topic one can use this piece of code:

if (READ_FROM_BEGINNING) {
    //consume all the messages from the topic from the beginning.
    //this doesn't work reliably if it consumer.poll(..) is not called first 
    //probably because of lazy-loading issues            
    consumer.poll(10);
    consumer.seekToBeginning(consumer.assignment()); //if intending to 
    //read from the beginning or call below to read from a predefined offset.
    //consumer.seek(consumer.assignment().iterator().next(), READ_FROM_OFFSET);
}
0
votes

The consumer group id the consumer group which should be defined in the Kafka consumer.properties file.

Do add "my-topic" to consumer group and it should work as below:

# consumer group id
group.id=my-topic-consumer-group
-4
votes

Give any random value to group id. It doesn't matter.

props.put("group.id", "Any Random Value");