4
votes

Now that Golang Kafka library (sarama) is providing consumer group capability without any external library help with kafka 10. How can I get the current message offset being processed by a consumer group at any given time ?

Previously I used kazoo-go (https://github.com/wvanbergen/kazoo-go) to get my consumer group message offset as it is stored in Zookeeper. Now I use sarama-cluster (https://github.com/bsm/sarama-cluster), I am not sure which API to use to get my consumer group message offset.

2

2 Answers

0
votes

Under the hood the consumerGroupSession struct is using PartitionOffsetManager to get next offset:

    if pom := s.offsets.findPOM(topic, partition); pom != nil {
        offset, _ = pom.NextOffset()
    }

Here is the documentation of pom.NextOffset().

When a consumerGroupSession constructs a consumerGroupClaim struct via newConsumerGroupClaim() method, it passes offset, returned by pom.NextOffset(), as offset argument. You can access it later via claim.InitialOffset(). After you started consuming messages, you can use message.Offset of the currently processed message.

Unfortunately, consumerGroupSession.offsets.findPOM() can't be accessed from ConsumerGroupHandler.ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) method, because it receives session as a ConsumerGroupSession interface, not as consumerGroupSession struct. So the offsets variable is private and not accessible.

Thus we can't really access NextOffset() method, which does precisely what the OP wants.

-1
votes

I am also working with Sarama and Kafka to get offset of a topic.

You can get offset with following code.

    package main

    import (
     "gopkg.in/Shopify/sarama"
     "fmt"
    )

    func main(){
      client , err := sarama.Client([]string{"localhost:9092"},nil) // I am not giving any configuration
      if err != nil {
          panic(err)
      }
      lastoffset, err := client.GetOffset("topic-test",0,sarama.OffsetNewest)
      if err != nil {
          panic(err)
      }
      fmt.Println("Last Commited Offset ",lastoffset)
    }

Let me know if this is the answer you are looking for and if it is helpful.