0
votes

I have a Spark Streaming Scala application that reads data from a Kafka topic and places it on HDFS. I want the app to store offsets of read messages to __consumer_offsets topic in order to start reading from it in case of the app's failure. The app runs fine (I can see the data on HDFS), but I cannot see its commits to __consumer_offsets.

Here is my KafkaParams:

val kafkaParams = Map(
      "metadata.broker.list" -> "xx.xxx.x.xx:6667",
      "enable.auto.commit" -> "true",
      "group.id" -> "reading_telemetry",
      "offsets.storage" -> "kafka"
    )

The command I use to get committed offsets from __consumer_offsets is the following:

$ /usr/hdp/3.0.0.0-1634/kafka/bin/kafka-console-consumer.sh --consumer.config /tmp/consumer.config   --zookeeper xx.xxx.x.xx:2181   --topic __consumer_offsets --from-beginning --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

I get some info on committed offsets of the form

[test1,test,0]::[OffsetMetadata[55,NO_METADATA],CommitTime 1539603328309,ExpirationTime 6723603328309]

But I don't see any commits for the "reading_telemetry" group ID. Any idea, why?

My environment:

Kafka: 1.0.1 Spark: 2.3.1 Scala: 2.11.8

2

2 Answers

1
votes

Use kafka-consumer-groups.sh script as given below :

kafka-consumer-groups.sh  --bootstrap-server <BootStrapServerIP:port> --describe --group telemetryGroup

It will return information in below format :

GROUP                  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG       OWNER
telemetryGroup        test-topic      0          15              15              0         telemetryGroup-1/127.0.0.1
telemetryGroup        test-topic      1          14              15              1         telemetryGroup-2_/127.0.0.1
0
votes

You should not directly read from the __consumer_offsets topic. This is an internal topic and you should instead use tools to retrieve committed offsets.

The easiest is to run the kafka-consumer-groups tool:

kafka-consumer-groups.sh \
    --bootstrap-server [BOOTSTRAP_SERVERS] \
    --describe \
    --group reading_telemetry

The column CURRENT-OFFSET contains the committed offset.