1
votes

I have a PySpark dataframe with three columns. The first two column have arrays as their elements, while the last column gives the length of arrays of the last column. Following is the PySpark dataframe:

+---------------------+---------------------+-----+
|                   c1|                   c2|lenc2|
+---------------------+---------------------+-----+
|[2017-02-14 00:00:00]|[2017-02-24 00:00:00]|    1|
|[2017-01-16 00:00:00]|                   []|    0|
+---------------------+---------------------+-----+

The arrays contain timestamp dtypes. The column lenc2 denotes length of array in column c1. For all rows where lenc2==0, the column c1 has only one (timestamp) element.

For all rows where lenc2==0, I want to take the timestamp from the array in column c1 , add 5 days to it and put it inside the array in row c2. How can I do this?

This is an example of the expected output:

+---------------------+---------------------+-----+
|                   c1|                   c2|lenc2|
+---------------------+---------------------+-----+
|[2017-02-14 00:00:00]|[2017-02-24 00:00:00]|    1|
|[2017-01-16 00:00:00]|[2017-01-21 00:00:00]|    0|
+---------------------+---------------------+-----+

Below is what I have tried till now:

df2 = df1.withColumn(
    "c2",
    F.when(F.col("lenc2") == 0, F.array_union(F.col("c1"), F.col("c2"))).otherwise(
        F.col("c2")
    ),
)
1

1 Answers

1
votes

You’ve got the when(…).otherwise(…) already correct.

Given that you don’t seem to be interested in subsecond accuracy, you can convert the timestamps to seconds since the Unix epoch and add for 5 days worth of seconds, then convert back to a timestamp:

from datetime import datetime

from pyspark.sql.functions import *

one_sec_before_leap_time = datetime(2016, 12, 31, 23, 59, 59)
seconds_in_a_day = 24 * 3600

df = spark.createDataFrame([
    ([one_sec_before_leap_time], [datetime.now()], 1),
    ([one_sec_before_leap_time], [], 0),
],
    schema=("c1", "c2", "lenc2"))


def add_seconds_to_timestamp(ts_col, seconds_col):
    return to_timestamp(unix_timestamp(ts_col) + seconds_col)


df2 = df.withColumn("c2",
                    when(col("lenc2") == 0,
                         array(
                             add_seconds_to_timestamp(
                                 col("c1").getItem(0),
                                 lit(5 * seconds_in_a_day))))
                    .otherwise(col("c2")))
df2.show(truncate=False)
# +---------------------+----------------------------+-----+                      
# |c1                   |c2                          |lenc2|
# +---------------------+----------------------------+-----+
# |[2016-12-31 23:59:59]|[2019-12-07 16:58:32.864176]|1    |
# |[2016-12-31 23:59:59]|[2017-01-05 23:59:59]       |0    |
# +---------------------+----------------------------+-----+

Be aware, this will most likely give you odd results when you have to take daylight savings into account. It’s better to express everything in UTC and only at the inputs and outputs do a proper conversion from UTC timestamps to times expressed in local timezones. Similar to the Unicode sandwich, basically.

Additionally, this does not take into account leap seconds, as illustrated above (there's another second in 2016, making 2016-12-31T12:59:60Z technically valid). However, leap seconds are notoriously hard, because there’s no exact formula for it (yet - who knows, maybe one day we can model geological and climatic events?).