0
votes

I have a batch job which will be triggered once a day. The requirement is to

  1. consume all the messages available on the Kafka Topic at that point of time
  2. Process the messages
  3. If the process was successfully completed, commit the offsets.

Currently I poll() the messages in while loop until ConsumerRecords.isEmpty() is true. When ConsumerRecords.isEmpty() is true I assume all the records available on Topic at the point of time has been consumed. The application maintains the offsets and closes the kafka consumer.

When the processing on messages is done and successfully completed, I create a new KafkaConsumer and commit the offsets maintained by the application.

Note I close the KafkaConsumer initially used to read the messages and use another KafkaConsumer instance to commit the offsets to avoid the consumer rebalance exception.

I am expecting max of 5k messages on Topic. The topic is partitioned and replicated.

Is there any better way to consume all messages on Topic at a specific point of time ? Is there anything I am missing or need to take care of ? I don't think I need to take care of consumer rebalancing since I poll() for the messages in loop and process the messages after the polling is done.

I am using java kafka client v0.9 and can change to v0.10 if it helps in above scenario.

Thanks

Updated:

AtomicBoolean flag = new AtomicBoolean();
flag.set(true);

while(flag.get()) {

 ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(timeout);

 if(consumerRecords.isEmpty()) {
     flag.set(false);
     continue;
 }
  //if the ConsumerRecords is not empty process the messages and continue to poll()
}
kafkaConsumer.close();
1
Could you clarify what does the result looks like? Specifically, is the result appended to a list of daily results (like one daily report, independent from the other day's reports) or is it combined with previous results (like updating a stock level by subtracting the total items sold today)? Or something else?Svend
Also what is the nature of the 5K daily input events and how are they assigned to each day? For example, if due to some issue the batch did not execute successfully for the last 3 days, during the next successful execution, should it split the 15K messages to process each day separately or does the computation accepts to receive the whole set of not-yet processed messages ?Svend
@Svend The application generates a daily report. The daily input events come from a third party system. If the batch did not executed successfully for the last 3 days, it does not have to split the messages but receive all the not-yet processed messages.Abdullah Shaikh
Thanks for the clarification. Is Kafka Streams a possible alternative for you? In my experience, after realising that a daily batch is often equivalent to a 24h tumbling window in stream processing, I found that Kafka-Streams helps expressing daily batch as just one particular case of stream processing. E.g, stream.groupby(getDay).aggregate(updateReport).to(outputStream) updates the appropriate report based on what information is available. This one just does nothing if no new event is received, though you may shut it down during the day if other constrains require you to.Svend
Kafka Streams looks like a possible alternative. I will explore and see how it works. Thanks for the suggestion.Abdullah Shaikh

1 Answers

0
votes

You can't assume that after a call to poll() you have read all the messages available in the topic in that moment due to the max.poll.records configuration parameter on the consumer. This is the maximum number of records returned by a single poll() and its default value is 500. It means that if in that moment there are i.e. 600 messages in the topic, you need two calls on poll() for reading all the messages (but consider that meanwhile some other messages could arrive). The other thing I don't understand is why you are using a different consumer for committing offsets. What's the consumer rebalance exception you are talking about ?