Recently while working with Kafka my application required to access all messages in a topic from beginning. so while writing a Kafka Consumer(using Java API) i am able to read messages from beginning but it only returns first 500 messages in a topic. tried to increase
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,Integer.MAX_VALUE); props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, Long.MAX_VALUE);
but still it doesn't returns all the messages,whereas while using CLI command,
kafka-console-consumer --bootstrap-server localhost:9092 --topic --from-beginning
it returns all my 5000 records.
Guys is there any config missing ? Any help will be appreciable..
[EDIT :1 ]
Code for consumer.
public ConsumerRecords<byte[], byte[]> pullFromKafka(String topicname, Map<String, Object> props) {
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
consumer.subscribe(new ArrayList<String>(Collections.singletonList(topicname)));
consumer.poll(0);
// Reading topic offset from beginning
consumer.seekToBeginning(consumer.assignment());
// poll and time-out if no replies
ConsumerRecords<byte[], byte[]> records = consumer.poll(1000);
consumer.close();
return records;
}
However i have changed the consumer:
public Map<String, byte[]> pullFromKafka(String topicname, Map<String, Object> props) {
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
Map<String, byte[]> entityMap = new HashMap<String, byte[]>();
boolean stop = false;
consumer.subscribe(new ArrayList<String>(Collections.singletonList(topicname)));
consumer.poll(0);
// Reading topic offset from beginning
consumer.seekToBeginning(consumer.assignment());
while (!stop) {
// Request unread messages from the topic.
ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(1000);
Iterator<ConsumerRecord<byte[], byte[]>> iterator = consumerRecords.iterator();
if (iterator.hasNext()) {
while (iterator.hasNext()) {
ConsumerRecord<byte[], byte[]> record = iterator.next();
// Iterate through returned records, extract the value
// of each message, and print the value to standard output.
entityMap.put(new String(record.key()), record.value());
}
} else {
stop = true;
}
}
return entityMap;
}
Although now it is fetching all records, but i am wondering if there is any better way.