7
votes

Looking at the latest (v0.10) Kafka Consumer documentation:

"The position of the consumer gives the offset of the next record that will be given out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances every time the consumer receives data calls poll(long) and receives messages."

Is there a way to query for the largest offset available for the partition on the server side, without retrieving all the messages?

The logic I am trying to implement is as follows:

  1. query every second for the amount (A) of pending messages in a topic
  2. if A > threshold, wake up a processor that would go ahead retrieving all the messages, and processing them
  3. otherwise do nothing (sleep 1)

The motivation is that I need to do some batch processing, but I want the processor to wake up only when there is enough data (and I don't want to retrieve all the data twice).

3

3 Answers

7
votes

You can use the Consumer.seekToEnd() method, run Consumer.poll(0) to make that take effect but return immediately, then Consumer.position() to find the positions for all subscribed (or assigned) topic partitions. These will be the current final offsets for all partitions. This will also start fetching some data from the brokers for those offsets, but any returned data will be ignored if you subsequently seek back to a different position.

Currently the alternative, as mentioned by serejja, is to use the old simple consumer, although the process is quite a bit more complicated as you need to manually find the leader for each partition.

1
votes

Sadly, I don't see how this is possible with 0.10 consumer.

However, this is doable if you have any lower level Kafka client (sorry but I'm not sure if one exists for JVM, but there are plenty of them for other languages).

So if you have some time and inspiration to implement this, here's the way to go - every FetchResponse (which is the response for each "give me messages" request) contains a field called HighwaterMarkOffset, which essentially is an offset at the end of partition (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse). The trick here is to send a FetchRequest that will immediately return (e.g. won't block waiting) nothing but HighwaterMarkOffset.

To do so your FetchRequest should have:

  1. MaxWaitTime set to 0, which would mean "return immediately if cannot fetch at least MinBytes bytes".
  2. MinBytes set to 0, which means "I'm OK if you return me an empty response".
  3. FetchOffset doesn't matter in this case, and if I'm not wrong it might even be an invalid offset, but probably better to be a valid one.
  4. MaxBytes set to 0, which means "give me no more than 0 bytes of data", e.g. nothing.

This way this request will return immediately with no data, but still with the highwatermark offset set to a proper value. Once you have the highwatermark offset, you can compare it to your current offset and figure out how much behind you are.

Hope this helps.

0
votes

you can use this method public OffsetAndMetadata committed(TopicPartition partition) from the API below to get the last committed offset

https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html