1
votes

I am trying to use the Spark SQL functions 'WHEN / OTHERWISE' in a HiveContext, along with LAG in a Window, to create a DIFF Field for an ascending Numeric Count Field in some sequential minute data, that frequently resets to Zero. So I need to correct for the 'counts' resetting to zero.

So my Code is as follows:

window = Window.partitionBy("car","trip_id").orderBy("car","datetime")
df = df.withColumn('new_count', F.when(df.num_count >= F.lag(df.num_count),(df.num_count- F.lag(df.num_count))).otherwise(df.num_count.astype('long')).over(window))

My ERROR in Pyspark is SAYING:

: java.lang.UnsupportedOperationException: CASE WHEN ...<"variable names">...  is not supported in window operation

Would it be better to try to use "sqlContext.sql("Select CASE WHEN...lag(num_count) OVER...")?

1
I'm a bit afraid that lag is not supported by SQLContext. You have to use HiveContext. - JiriS

1 Answers

4
votes

Window should be applied to each window function individually. In this particular case you can simply extract expression like this:

num_count_lag = F.lag(df.num_count).over(window)

df.withColumn(
    'new_count',
    F.when(
        df.num_count >= num_count_lag,
        df.num_count - num_count_lag
    ).otherwise(df.num_count.astype('long'))
)