I have an akka application (in JAVA) that uses commitablePartitionedSource
for consuming messages from kafka topics. I have a few consumer groups that spin up consumers for multiple topics. This is driven by a dynamic configuration where I can shutdown consumers temporarily and maybe start them back at a later point.
When this consumer restarts, I only want to read new messages and not start from where I left off.
Is there a way to get the kafkaConsumer object from akka-alpakka consumer, so that I can seekToEnd() before processing? Please let me know if there is any other way to achieve this? Maybe with akka config or a different type of consumer? I prefer not maintaining my own offsets (Hopefully not the only option)
My config is set up to get the latest
offset when I start a consumer group, but since I am shutting down and restarting individual consumers it always starts consuming where I left off.
I tried creating a consumer group for a topic, but I have many topics and it turns out pretty resource intensive. I also looked for a way to clear offsets stored in kafka for that topic unsuccessfully.