I am using Spark Structured streaming along with Cassandra as a sink. Snippet below:
override def start(): StreamingQuery = {
sparkContext.getSparkSession()
.readStream
.option("header", "false")
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServer)
.option("failOnDataLoss","false")
.option("subscribe", topicName)
.load()
.writeStream
.option("checkpointLocation",checkpointLocation)
.foreachBatch(forEachFunction.arceusForEachFunction(_,_))
.start()
And I am using the below to write to Cassandra inside the foreach:
RDD.saveToCassandra(keyspace, tableName)
While doing this I was wondering how to handle issues like Casssandra going down etc. Suppose, out of 3M data to be loaded, 2M was written, before an issue occurred. Now I either have to undo the 2M or process only the 1M. I am not sure what will happen is such scenarios.
Is this somehow taken care of ? Or is there something I have to write to take care of this ?
I also looked at the spark docs, and for "foreach batch "it says "depends on the implementation"
Any help is appreciated. Thanks
