3
votes

I am using spark Structured streaming. I have a Dataframe and adding a new column "current_ts".

inpuDF.withColumn("current_ts", lit(System.currentTimeMillis()))

This does not update every row with current epoch time. It updates the same epcoh time when the job was trigerred causing every row in DF to have the same values. This works well with normal spark jobs. Is this a issue with spark structured streaming ?

4
Hi @Nats, were you able to achieve this? I have similar requirement. - Vasu

4 Answers

1
votes

Well spark records your transformations as lineage graph, and only executes the graph when some action is called. So it will call

System.currentTimeMillis()

when some action is triggered. What I didn't understand that what in it you find confusing or what are you trying to achieve. Thanks.

1
votes

Spark has a function to create a column with current timestamp. Your code should look like this:

import org.apache.spark.sql.functions

// ...

inpuDF.withColumn("current_ts", functions.current_timestamp())
0
votes

The problem with your method is that use lit which is literal function or a constant. Spark will treat that as constant which is passed from the driver. So when you execute the job, the literal will be evaluated with the time you execute. All records have the same timestamp. You need to use function instead. current_timestamp() should work.

0
votes

Try this

inpuDF.writeStream.partitionBy('current_ts')