Question: How can I query a kafka topic for the offset of a specific consumer group inside of flink code? (And side question (will make a new question on here if I need to ). How, if possible, can I get a timestamp of that offset?
(I have found there are cli tools to just query it, but that isn't what I want since it isn't programmatically done inside the flink job)
Some additional background on the full problem, but I don't want to make this too open ended.
I have a use case where data will be flowing from kafkaTopic1 into a program (let's call it P1), processed, and then persisted to a database. P1 will be on a multi node cluster so each node will handle a number of kafka partitions (lets say there are 5 nodes and 50 kafka partitions for the topic). If one of the nodes completely fails for whatever reason and there is data being processed, then that data would be lost.
For example if there are 500 messages on kafkaTopic1 and node2 has pulled 10 messages (so the next message to be pulled according to the offset is message 11) but only 8 of them have been fully processed and persisted to the database when the node fails, the 2 that were still being processed would be lost. And when the node is brought back up it would start reading from message 11, skipping the two lost messages (and technically that kafka partition would start sending its messages to another node to be processed so the offset of that partition would move and we would not necessarily know exactly what message was next to be processed when the node died).
(Note: when the node dies, assume a user notices and turns off P1 completely so no more data will be processed at this point, for the time being).
So this is where flink comes into play. I want to make a flink job that can be told the consumer group of P1 via an argument from the user, then query the kafka topic (also provided by the user) to get the current offset (OS1). Then with that, the flink job will set its offset for kafkaTopic1 to be X amount of time prior to OS1 (X supplied by user via args) and begin reading messages from the kafka topic. It will then compare each message it reads with what is in the database and if it does not find the message in the database, it sends it to another kafka topic (kafkaTopic2) to be processed by P1 when it is restarted.