1
votes

Is it possible to update a hiveContext dataframe column in pyspark using a complex function not doable in a UDF?

I have a dataframe containing many columns, out of which 2 columns are called timestamp and data. I need to retrieve the timestamp from within the JSON string in data and update the timestamp column if the timestamp in data fulfills certain criteria. I know that that dataframes are immutable, but is possible to somehow build a new dataframe retaining all the columns of the old dataframe but updating the timstamp column?

Code illustrating what i would like to do:

def updateTime(row):
    import json

    THRESHOLD_TIME = 60 * 30
    client_timestamp = json.loads(row['data'])
    client_timestamp = float(client_timestamp['timestamp'])
    server_timestamp = float(row['timestamp'])
    if server_timestamp - client_timestamp <= THRESHOLD_TIME:
        new_row = .....  # copy contents of row
        new_row['timestamp'] = client_timestamp
        return new_row
    else:
        return row

df = df.map(updateTime)

I thought of mapping the row contents to a tuple and then converting it back to a dataframe with .toDF() but I can't find a way to copy the row contents into a tuple and then getting back the column names.

1
What about if you use an UDF?Alberto Bonsanto
Maybe this article can help: sparktutorials.net/…Daniel de Paula
Sorry i meant UDF instead of HDF... typo...SK2

1 Answers

0
votes

If you adapt your updateTime function to receive a Timestamp as parameter and return the new processed Timestamp, you can create an UDF and use it directly on the DataFrame's column:

from pyspark.sql.functions import *
from pyspark.sql.types import TimestampType

myUDF = udf(updateTime, TimestampType())
df = df.withColumn("timestamp", myUDF(col("timestamp"))

However, in your case I think it's a little more complex:

from pyspark.sql.functions import *
from pyspark.sql.types import TimestampType

myUDF = udf(getClientTime, TimestampType())
client_timestamp = myUDF(col("data"))
server_timestamp = col("timestamp")
condition = server_timestamp.cast("float") - client_timestamp.cast("float") <= THRESHOLD_TIME    

newCol =  when(condition, client_timestamp).otherwise(server_timestamp) 
newDF = df.withColumn("new_timestamp", newCol)

With this second approach, the function getClientTime receives a value from the data column and returns the client timestamp for this value. Then, you can use it to create a new column (client_timestamp) that contains this information. Finally you can use when to create the new column conditionally, based on the values of the server_timestamp column and the newly created client_timestamp column.

Reference: