I want to log number of records read to database from incoming stream of spark structured streaming. I'm using foreachbatch to transform incoming stream batch and write to desired location. I want to log 0 records read if there are no records in a particular hour. But foreach batch does not execute when there is no stream. Can anyone help me with it? My code is as below:
val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()
val query=incomingStream.writeStream.foreachBatch{
(batchDF: DataFrame, batchId: Long)=> writeStreamToDataLake(batchDF,batchId,partitionColumn,fileLocation,errorFilePath,eventHubName,configMeta)
}
.option("checkpointLocation",fileLocation+checkpointFolder+"/"+eventHubName)
.trigger(Trigger.ProcessingTime(triggerTime.toLong))
.start().awaitTermination()