3
votes

I have a Spark Structured Streaming job which is configured to read data from Kafka. Please go through the code to check the readStream() with parameters to read the latest data from Kafka.

I understand that readStream() reads from the first offset when a new query is started and not on resume.

But I don't know how to start a new query every time I restart my job in IntelliJ.

val kafkaStreamingDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", AppProperties.getProp(AppConstants.PROPS_SERVICES_KAFKA_SERVERS))
  .option("subscribe", AppProperties.getProp(AppConstants.PROPS_SDV_KAFKA_TOPICS))
  .option("failOnDataLoss", "false")
  .option("startingOffsets","earliest")
  .load()
  .selectExpr("CAST(value as STRING)", "CAST(topic as STRING)")

I have also tried setting the offsets by """{"topicA":{"0":0,"1":0}}"""

Following is my writestream

val query = kafkaStreamingDF
  .writeStream
  .format("console")
  .start()

Every time I restart my job in IntelliJ IDE, logs show that the offset has been set to latest instead of 0 or earliest.

Is there way I can clean my checkpoint, in that case I don't know where the checkpoint directory is because in the above case I don't specify any checkpointing.

2
When I run this code as is, and re-start it in IntelliJ the query always starts reading from beginning. Nothing wrong with the code.mike

2 Answers

0
votes

Kafka relies on the property auto.offset.reset to take care of the Offset Management.

The default is “latest,” which means that lacking a valid offset, the consumer will start reading from the newest records (records that were written after the consumer started running). The alternative is “earliest,” which means that lacking a valid offset, the consumer will read all the data in the partition, starting from the very beginning.

As per your question you want to read the entire data from the topic. So setting the "startingOffsets" to "earliest" should work. But, also make sure that you are setting the enable.auto.commit to false.

By setting enable.auto.commit to true means that offsets are committed automatically with a frequency controlled by the config auto.commit.interval.ms.

Setting this to true commits the offsets to Kafka automatically when messages are read from Kafka which doesn’t necessarily mean that Spark has finished processing those messages. To enable precise control for committing offsets, set Kafka parameter enable.auto.commit to false.

-1
votes

Try to set up .option("kafka.client.id", "XX"), to use a different client.id.