1
votes

I have an akka application (in JAVA) that uses commitablePartitionedSource for consuming messages from kafka topics. I have a few consumer groups that spin up consumers for multiple topics. This is driven by a dynamic configuration where I can shutdown consumers temporarily and maybe start them back at a later point.

When this consumer restarts, I only want to read new messages and not start from where I left off.

Is there a way to get the kafkaConsumer object from akka-alpakka consumer, so that I can seekToEnd() before processing? Please let me know if there is any other way to achieve this? Maybe with akka config or a different type of consumer? I prefer not maintaining my own offsets (Hopefully not the only option)

My config is set up to get the latest offset when I start a consumer group, but since I am shutting down and restarting individual consumers it always starts consuming where I left off.

I tried creating a consumer group for a topic, but I have many topics and it turns out pretty resource intensive. I also looked for a way to clear offsets stored in kafka for that topic unsuccessfully.

2

2 Answers

1
votes

The easiest way would just be to create a new consumer group each time you start your restart your consumer. Kafka will take care of removing stale consumer groups after a configurable amount of time (retention.ms).

This strategy is fine if you rarely restart your consumer and always want it to process fresh data instead of catching up with all the missed messages.

EDIT

As far as I know the only way to have access to the underlying KafkaConsumer is to use a committableExternalSource. This way you'll have access to the seekToEnd method, however you'll also need take care of subscribing to the topic providing the start offset per partition (similarly as how you're doing now setting up the committablePartitionedSource but outside of Akka).

0
votes

commitablePartitionedSource takes AutoSubscription as an input, which you cannot specify offset.

What you need is a method that takes ManualSubscription or higher level Subscription, such as

> plainExternalSource
> committableExternalSource
> plainSource
...