I have a batch job which will be triggered once a day. The requirement is to
- consume all the messages available on the Kafka Topic at that point of time
- Process the messages
- If the process was successfully completed, commit the offsets.
Currently I poll() the messages in while loop until ConsumerRecords.isEmpty() is true. When ConsumerRecords.isEmpty() is true I assume all the records available on Topic at the point of time has been consumed. The application maintains the offsets and closes the kafka consumer.
When the processing on messages is done and successfully completed, I create a new KafkaConsumer and commit the offsets maintained by the application.
Note I close the KafkaConsumer initially used to read the messages and use another KafkaConsumer instance to commit the offsets to avoid the consumer rebalance exception.
I am expecting max of 5k messages on Topic. The topic is partitioned and replicated.
Is there any better way to consume all messages on Topic at a specific point of time ? Is there anything I am missing or need to take care of ? I don't think I need to take care of consumer rebalancing since I poll() for the messages in loop and process the messages after the polling is done.
I am using java kafka client v0.9 and can change to v0.10 if it helps in above scenario.
Thanks
Updated:
AtomicBoolean flag = new AtomicBoolean();
flag.set(true);
while(flag.get()) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(timeout);
if(consumerRecords.isEmpty()) {
flag.set(false);
continue;
}
//if the ConsumerRecords is not empty process the messages and continue to poll()
}
kafkaConsumer.close();
stream.groupby(getDay).aggregate(updateReport).to(outputStream)
updates the appropriate report based on what information is available. This one just does nothing if no new event is received, though you may shut it down during the day if other constrains require you to. – Svend