0
votes

Recently while working with Kafka my application required to access all messages in a topic from beginning. so while writing a Kafka Consumer(using Java API) i am able to read messages from beginning but it only returns first 500 messages in a topic. tried to increase

props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,Integer.MAX_VALUE); props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, Long.MAX_VALUE);

but still it doesn't returns all the messages,whereas while using CLI command,

kafka-console-consumer --bootstrap-server localhost:9092 --topic --from-beginning

it returns all my 5000 records.

Guys is there any config missing ? Any help will be appreciable..

[EDIT :1 ]

Code for consumer.

public ConsumerRecords<byte[], byte[]> pullFromKafka(String topicname, Map<String, Object> props) {
    KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
    consumer.subscribe(new ArrayList<String>(Collections.singletonList(topicname)));
    consumer.poll(0);
    // Reading topic offset from beginning
    consumer.seekToBeginning(consumer.assignment());
    // poll and time-out if no replies
    ConsumerRecords<byte[], byte[]> records = consumer.poll(1000);
    consumer.close();
    return records;
}

However i have changed the consumer:

public Map<String, byte[]> pullFromKafka(String topicname, Map<String, Object> props) {
    KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
    Map<String, byte[]> entityMap = new HashMap<String, byte[]>();
    boolean stop = false;
    consumer.subscribe(new ArrayList<String>(Collections.singletonList(topicname)));
    consumer.poll(0);
    // Reading topic offset from beginning
    consumer.seekToBeginning(consumer.assignment());
    while (!stop) {
        // Request unread messages from the topic.
        ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(1000);
        Iterator<ConsumerRecord<byte[], byte[]>> iterator = consumerRecords.iterator();
        if (iterator.hasNext()) {
            while (iterator.hasNext()) {
                ConsumerRecord<byte[], byte[]> record = iterator.next();
                // Iterate through returned records, extract the value
                // of each message, and print the value to standard output.
                entityMap.put(new String(record.key()), record.value());
            }
        } else {
            stop = true;
        }
    }
    return entityMap;
}

Although now it is fetching all records, but i am wondering if there is any better way.

1
Can you post your consumer code example.Thiago Baldim
You can look at Kafka Console consumer code on Github, by the way, to check out how it works / what properties are setOneCricketeer

1 Answers

1
votes

There is nothing wrong with using seekToBeginning() to consume all messages.

There is however a slightly more flexible way to achieve the same result. You can do it via configuration and that allows you to keep the same code for consuming both from the start and end. This is also the method the kafka-console-consumer.sh tool uses:

  1. Set auto.offset.reset to earliest

  2. Set group.id to a new/random value. If you are not interested in keeping track of this consumer position but always want to start from the beginning, you can also set enable.auto.commit to false to avoid polluting the offsets topic.

  3. Remove seekToBeginning() from your logic

Now regarding your logic there are a few things that you should consider:

  1. There are cases when poll() can return an empty collection even if it has not reached the end. Also topic is a stream (unbounded), the end can move. Either way you can use endOffsets() to find the current end offset and compare that to the offset of the messages returned

  2. You probably don't want to poll until you reach the end. A topic can be several GBs in size and contain millions of records. Storing everything in a map can easily lead to OutOfMemory issues.