4
votes

How to get the current batch timestamp (DStream) in Spark streaming?

I have a spark streaming application where the input data will under go many transformations.

I need the current timestamp during the execution to validate the timestamp in input data.

If I compare with the current time then timestamp might differ from each RDD transformation execution.

Is there any way to get the timestamp, when the particular Spark streaming micro batch has started or which micro batch interval it belongs?

3
Hi, did you find an answer for this? - carlos_technogi

3 Answers

6
votes
dstream.foreachRDD((rdd, time)=> {
  // time is scheduler time for the batch job.it's interval was your window/slide length.
})
3
votes
dstream.transform(
    (rdd, time) => {
        rdd.map(
            (time, _)
        )
    }
).filter(...)
2
votes

Late reply...but still if it helps somebody, the timestamp can be extracted as milliseconds. First define a function using Java API for formatting:

Using Java 7 - style util.Date/DateFormat:

def returnFormattedTime(ts: Long): String = {
    val date = new Date(ts)
    val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    val formattedDate = formatter.format(date)
    formattedDate
}

Or else, using Java 8 - style util.time:

def returnFormattedTime(ts: Long): String = {
    val date = Instant.ofEpochMilli(ts)
    val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault())
    val formattedDate = formatter.format(date)
    formattedDate
}

Finally, use the foreachRDD method to get the timestamp:

dstreamIns.foreachRDD((rdd, time) =>
    ....
    println(s"${returnFormattedTime(time.milliseconds)}")
    ....
)