0
votes

I would like to create Spark Structured Streaming job reading messages from Kafka source, writing to Kafka sink, which after failure will resume reading only current, newest messages. For that reason I don't need to keep checkpoints for my job.

But it looks like there is no option to disable checkpointing while writing to Kafka sink in Structured Streaming. To my understanding, even if I specify on the source:

.option("startingOffsets", "latest")

it will be taken into account only when the stream is first run, and after failure stream will resume from the checkpoint. Is there some workaround? And is there a way to disable checkpointing?

1
are you facing any issue if you add checkpoint location ? - Srinivas
I'm adding it - trying to skip it would result in an error. I'm using databricks and they implement hdfs (dbfs) on top of S3. There is a lot of traffic related to checkpointing process, generating unnecessary costs. Also, if I understand correctly the documentation, after failure my job would always restart from checkpoint rather than from newest/ current messages, but I didn't test it yet. I don't want it to start from the checkpoint. - andrew5891
I got it. will below solution works ?? I have given solution for hdfs .. may be you can convert as per databricks file system (dbfs) - Srinivas

1 Answers

0
votes

As workaround for this is to delete existing check point location from your code so that every time it will start fetching latest offset data.

import org.apache.hadoop.fs.{FileSystem, Path}

val checkPointLocation="/path/in/hdfs/location"
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)

fs.delete(new Path(checkPointLocation),true) 
// Delete check point location if exist.

val options = Map(
    "kafka.bootstrap.servers"-> "localhost:9092",
    "topic" -> "topic_name",
    "checkpointLocation" -> checkPointLocation,
    "startingOffsets" -> "latest"
  )
df
  .writeStream
  .format("kafka")
  .outputMode("append")
  .options(options)
  .start()
  .awaitTermination()