I am writing a Kafka consumer application where I have one consumer for each partition. The code is shown below
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e != null)
log.error("Commit failed for offsets {}", offsets, e); }});
}
Is there a way to programmatically access and print the consumer lag offset, or otherwise said the difference in positions between the offset of the last record read by the consumer and the offset of the last record written into that consumer partition by a certain producer.
What statements should I add to the above to get the lag offset value, knowing that my final goal is to send this value to prometheus for monitoring?