2
votes

Question: How can I query a kafka topic for the offset of a specific consumer group inside of flink code? (And side question (will make a new question on here if I need to ). How, if possible, can I get a timestamp of that offset?

(I have found there are cli tools to just query it, but that isn't what I want since it isn't programmatically done inside the flink job)

Some additional background on the full problem, but I don't want to make this too open ended.

I have a use case where data will be flowing from kafkaTopic1 into a program (let's call it P1), processed, and then persisted to a database. P1 will be on a multi node cluster so each node will handle a number of kafka partitions (lets say there are 5 nodes and 50 kafka partitions for the topic). If one of the nodes completely fails for whatever reason and there is data being processed, then that data would be lost.

For example if there are 500 messages on kafkaTopic1 and node2 has pulled 10 messages (so the next message to be pulled according to the offset is message 11) but only 8 of them have been fully processed and persisted to the database when the node fails, the 2 that were still being processed would be lost. And when the node is brought back up it would start reading from message 11, skipping the two lost messages (and technically that kafka partition would start sending its messages to another node to be processed so the offset of that partition would move and we would not necessarily know exactly what message was next to be processed when the node died).

(Note: when the node dies, assume a user notices and turns off P1 completely so no more data will be processed at this point, for the time being).

So this is where flink comes into play. I want to make a flink job that can be told the consumer group of P1 via an argument from the user, then query the kafka topic (also provided by the user) to get the current offset (OS1). Then with that, the flink job will set its offset for kafkaTopic1 to be X amount of time prior to OS1 (X supplied by user via args) and begin reading messages from the kafka topic. It will then compare each message it reads with what is in the database and if it does not find the message in the database, it sends it to another kafka topic (kafkaTopic2) to be processed by P1 when it is restarted.

1

1 Answers

1
votes

If checkpointing is enabled in a flink job then you shouldn't lose messages because flink maintains the offsets internally as well and after recovery from failure it should read from the offset flink last committed.

Now, if you still want to find the offsets and restart reading from the offsets, it gets tricky as you would need to find the offsets for all the partitions for the given topic by the given consumer group.

I am not aware how you can do that from the Flink-kafka-Consumer API out-of-the-box, but you may add the kafka dependency to your project and create a kafkaconsumer from the Kafka API. Once you have the consumer, you can call

consumer.position(partition) 

or

consumer.committed(partition)

Mind that you still need to loop over all the partitions to get all the current offsets

Read about the differences here : Kafka Javadoc

Once you have the offsets you want to read from, you can specify the consumer offsets in a flink job manually using something like the following:

Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);

myConsumer.setStartFromSpecificOffsets(specificStartOffsets);

For more info for Flink-kafka-Consumer, check this out Flink Kafka Connector