1
votes

I'm trying to round hours using pyspark and udf.

The function works properly in python but not well when using pyspark.

The input is :

date = Timestamp('2016-11-18 01:45:55') # type is pandas._libs.tslibs.timestamps.Timestamp

def time_feature_creation_spark(date):
    return date.round("H").hour

time_feature_creation_udf = udf(lambda x : time_feature_creation_spark(x), IntegerType())

enter image description here

Then I use it in the function that feeds spark :

data = data.withColumn("hour", time_feature_creation_udf(data["date"])

And the error is :

TypeError: 'Column' object is not callable

The expected output is just the closest hour from the time in the datetime (e.g. 20h45 is closest to 21h, so returns 21)

2

2 Answers

9
votes

A nicer version than /3600*3600 is using the built-in function date_trunc

import pyspark.sql.functions as F
return df.withColumn("hourly_timestamp", F.date_trunc("hour", df.timestamp))

other formats besides hour are

year’, ‘yyyy’, ‘yy’, ‘month’, ‘mon’, ‘mm’, ‘day’, ‘dd’, ‘hour’, ‘minute’, ‘second’, ‘week’, ‘quarter’

5
votes

You can't just apply a pyspark udf on a pandas dataframe.

If you want to do this conversion in spark, you need to convert the pandas dataframe to a spark data frame first.

date1 = Timestamp('2016-11-18 01:45:55')
date2 = Timestamp('2016-12-18 01:45:55')
df = pd.DataFrame({"date": [date1, date2]})

data = sqlContext.createDataFrame(df)

Then to calcualte the rounded hour, you don't need a UDF. This line would do the trick.

result = data.withColumn("hour", hour((round(unix_timestamp("date")/3600)*3600).cast("timestamp")))

What it does is:

  1. convert the timestamp to unix time in seconds using unix_timestamp()
  2. divide it by 3600 to hours, round it, and multiply the 3600 back
  3. cast the unix time back to normal timestamp using cast()
  4. extract the hour using hour() funcition

Spark uses it's own datatypes, so a pandas._libs.tslibs.timestamps.Timestamp would be converted to a pyspark.sql.types.TimestampType when you convert the pandas dataframe to spark dataframe, so pandas functions don't work anymore.