1
votes

Let say I have a dataframe ( stored in scala val as df) which contains the data from a csv:

time,temperature
0,65
1,67
2,62
3,59

which I have no problem reading this from file as a spark dataframe in scala language.

I would like to add a filtered column (by filter I meant signal processing moving average filtering), (say I want to do (T[n]+T[n-1])/2.0):

time,temperature,temperatureAvg
0,65,(65+0)/2.0
1,67,(67+65)/2.0
2,62,(62+67)/2.0
3,59,(59+62)/2.0

(Actually, say for the first row, I want 32.5 instead of (65+0)/2.0. I wrote it to clarify the expected 2-time-step filtering operation output)

So how to achieve this? I am not familiar with spark dataframe operation which combine rows iteratively along column...

1

1 Answers

6
votes

Spark 3.1+

Replace

$"time".cast("timestamp")

with

import org.apache.spark.sql.functions.timestamp_seconds

timestamp_seconds($"time")

Spark 2.0+

In Spark 2.0 and later it is possible to use window function as a input for groupBy. It allows you to specify windowDuration, slideDuration and startTime (offset). It works only with TimestampType column but it is not that hard to find a workaround for that. In your case it will require some additional steps to correct for boundaries but general solution can expressed as shown below:

import org.apache.spark.sql.functions.{window, avg}

df
    .withColumn("ts", $"time".cast("timestamp"))
    .groupBy(window($"ts", windowDuration="2 seconds", slideDuration="1 second"))
    .avg("temperature")

Spark < 2.0

If there is a natural way to partition your data you can use window functions as follows:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.mean

val w = Window.partitionBy($"id").orderBy($"time").rowsBetween(-1, 0)

val df = sc.parallelize(Seq(
    (1L, 0, 65), (1L, 1, 67), (1L, 2, 62), (1L, 3, 59)
)).toDF("id", "time", "temperature")

df.select($"*", mean($"temperature").over(w).alias("temperatureAvg")).show

// +---+----+-----------+--------------+                             
// | id|time|temperature|temperatureAvg|
// +---+----+-----------+--------------+
// |  1|   0|         65|          65.0|
// |  1|   1|         67|          66.0|
// |  1|   2|         62|          64.5|
// |  1|   3|         59|          60.5|
// +---+----+-----------+--------------+

You can create windows with arbitrary weights using lead / lag functions:

 lit(0.6) * $"temperature" + 
 lit(0.3) * lag($"temperature", 1) +
 lit(0.2) * lag($"temperature", 2)

It is still possible without partitionBy clause but will be extremely inefficient. If this is the case you won't be able to use DataFrames. Instead you can use sliding over RDD (see for example Operate on neighbor elements in RDD in Spark). There is also spark-timeseries package you may find useful.