7
votes

I am testing the Kafka High Level Consumer using the ConsumerGroupExample code from the Kafka site. I would like to retrieve all the existing messages on the topic called "test" that I have in the Kafka server config. Looking at other blogs, auto.offset.reset should be set to "smallest" to be able to get all messages:

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId)    {
    Properties props = new Properties();
    props.put("zookeeper.connect", a_zookeeper);
    props.put("group.id", a_groupId);
    props.put("auto.offset.reset", "smallest");
    props.put("zookeeper.session.timeout.ms", "10000");     

    return new ConsumerConfig(props);
}

The question I really have is this: what is the equivalent Java api call for the High Level Consumer that is the equivalent of:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

4

4 Answers

7
votes

Basically, everytime a new consumer tries to consume a topic, it'll read messages from the beginning. If you're especially just consuming from the beginning each time for testing purposes, everytime you initialise your consumer with a new groupID, it'll read the messages from the beginning. Here's how I did it :

properties.put("group.id", UUID.randomUUID().toString());

and read messages from the beginning each time!

5
votes

Looks like you need to use the "low level SimpleConsumer API"

For most applications, the high level consumer Api is good enough. Some applications want features not exposed to the high level consumer yet (e.g., set initial offset when restarting the consumer). They can instead use our low level SimpleConsumer Api. The logic will be a bit more complicated and you can follow the example in here.

This example worked for getting all messages from a topic with the following arguments: (note that the port is the Kafka port, not the ZooKeeper port, topics set up from this example):

10 my-replicated-topic 0 localhost 9092

Specifically, there is a method to get readOffset which takes kafka.api.OffsetRequest.EarliestTime():

long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);

Here is another post may provide some alternate ideas on how to sort this out: How to get data from old offset point in Kafka?

2
votes

To fetch messages from the beginning, you can do this:

import kafka.utils.ZkUtils;
ZkUtils.maybeDeletePath("zkhost:zkport", "/consumers/group.id");

then just follow the routine work...

0
votes
 Properties props = new Properties(); 
 props.put("bootstrap.servers", "localhost:9092");
 props.put("auto.offset.reset", "earliest");
 props.put("group.id", UUID.randomUUID().toString());

This properties will help you.