1
votes

I want to log number of records read to database from incoming stream of spark structured streaming. I'm using foreachbatch to transform incoming stream batch and write to desired location. I want to log 0 records read if there are no records in a particular hour. But foreach batch does not execute when there is no stream. Can anyone help me with it? My code is as below:

val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()

val query=incomingStream.writeStream.foreachBatch{
  (batchDF: DataFrame, batchId: Long)=> writeStreamToDataLake(batchDF,batchId,partitionColumn,fileLocation,errorFilePath,eventHubName,configMeta)
}
              .option("checkpointLocation",fileLocation+checkpointFolder+"/"+eventHubName)
              .trigger(Trigger.ProcessingTime(triggerTime.toLong))
              .start().awaitTermination()
1
That's how it works. Even with StreamingQueryListener and extension I think it only works if data is forthcoming. May be another option. - thebluephantom
@thebluephantom any solution that could work for my use case? getting zero count if no data is streamed in? - rishabh srivastava
log in advance 0 records per hour and then when you really log something, just AGGR to get the real value. - thebluephantom
did u resolve otherwise? - thebluephantom
i used separate thread to log records every hour through accumulator counter value. - rishabh srivastava

1 Answers

0
votes

This is how it works and even mods, extensions to StreamingQueryListener are invoked only when there is something to process and thus status changes of the stream.

There probably is another way, but I would say "think outside of the box" and pre-popualte with 0 per timeframe such a database and when querying AGGRegate and you will have the correct answer.

https://medium.com/@johankok/structured-streaming-in-a-flash-576cdb17bbee can give some insight plus the Spark: The Definitive Guide.