2
votes
position(TopicPartition partition)
Get the offset of the next record that will be fetched (if a record with that offset exists).

committed(TopicPartition partition): OffsetAndMetadata  

Get the last committed offset for the given partition (whether the commit happened by this process or another).

if i need use the latest committed offset of a particular consumer group ( to be used in startingOffset from Spark Structured Streaming ) , what should i use.

My code shows committed deprecated.

  val latestOffset = consumer.position(partition)
  val last=consumer.committed(partition)

  <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.4.1</version>
    </dependency>

Official Documentation :

Offsets and Consumer Position Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of a record within that partition, and also denotes the position of the consumer in the partition. For example, a consumer which is at position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5. There are actually two notions of position relevant to the user of the consumer: The position of the consumer gives the offset of the next record that will be given out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances every time the consumer receives messages in a call to poll(long).

The committed position is the last offset that has been stored securely. Should the process fail and restart, this is the offset that the consumer will recover to. The consumer can either automatically commit offsets periodically; or it can choose to control this committed position manually by calling one of the commit APIs (e.g. commitSync and commitAsync).

1

1 Answers

1
votes

You need to use the committed offset in your Spark Streaming Job as startingOffset.

The counter of the position API is incrementally increased by the KafkaConsumer during its runtime and can slightly differ from the result of the committed API because the consumer may or may not commit offsets and if it committs it might do it asynchronously.

In Kafka 2.4.1 the method committed(partition) is deprecated and it is recommended to use the newer API which takes a Set of TopicPartitions. Its signature is:

public Map<TopicPartition,OffsetAndMetadata> committed(Set<TopicPartition> partitions)

As you are using Scala, it is required to convert your Scala set into a Java set. This can be done as described here.