1
votes

During an ETL process I have this one SAS date field that is in a 5 digit integer format, which indicates days since 01-01-1960. I order to make this data column more useful in analysis I would like to convert the column to a date data type field in Redshift.

Currently I am trying to do this in pyspark as follows:

  • created new column "sas_date" with string literal "1960-01-01"

  • Using pyspark.sql.function.date_add I pass the "sas-date" column as the start date parameter and the integer value 'arrival_date' column as the second parameter.

  • When the date_add function runs I get error Column not iterable, even though I would think the arrival_date column being a series would mean it was iterable. But its not, why?

  • When I remove the 'arrival_date' column and replace it with a static integer value (say 1) the date_add function will work.

i94 = i94.withColumn('arrival_date', col('arrival_date').cast(Int()))
i94 = i94.withColumn('sas_date', lit("1960-01-01"))
i94 = i94.withColumn('arrival_date', date_add(col('sas_date'), i94['arrival_date']))

I want to be able to pass my column so that the second date_add parameter will be dynamic. However it seems date_add does not accept this? If date_addd does not accomplish this what other option do I have outside of using a UDF?

UPDATE: State of data right before the date_add() operation

i94.printSchema()

root

|-- cic_id: double (nullable = true)

|-- visa_id: string (nullable = true)

|-- port_id: string (nullable = true)

|-- airline_id: string (nullable = true)

|-- cit_id: double (nullable = true)

|-- res_id: double (nullable = true)

|-- year: double (nullable = true)

|-- month: double (nullable = true)

|-- age: double (nullable = true)

|-- gender: string (nullable = true)

|-- arrival_date: integer (nullable = true)

|-- depart_date: double (nullable = true)

|-- date_begin: string (nullable = true)

|-- date_end: string (nullable = true)

|-- sas_date: string (nullable = false)

i94.limit(10).toPandas()

toPandas() result

1
It Should work with your approach. can you show some sample data,SMaZ
Right @SMaZ ? Although I think I just realized why it behaves this way. When using withColumn('arrival_date',...) I think spark overwrites the 'arrival_date' column before it even applies the date_add sql function. I am going to test this theory later and will update answer to the question if I'm right.Mabloq
a@Mabloq: You don't need to evaluate arrival_date twice. Can you show data like what is arrival_date value at first place.SMaZ
Note that SAS stores all numbers as floating point. So perhaps you just need to convert your raw number of days variable values to integers to get it to work with date_add()?Tom
Hey @SMaZ thanks for sticking with me. I edited my quesion with schema data and 10 row sample. My theory was disproven :(Mabloq

1 Answers

2
votes

I think you are absolutely right, date_add is designed to take int values only till Spark <3.0.0:

In spark scala implementation i see below lines. It indicates that whatever value we pass it to function date_add it is converting again into column with lit

Spark <3.0.0:

def date_add(start: Column, days: Int): Column = date_add(start, lit(days))

Spark >=3.0.0:

def date_add(start: Column, days: Column): Column = withExpr { DateAdd(start.expr, days.expr) }

  • Now lets talk about Solution, i can think of two approaches :

Imports and prepare small set of your dataset:

import pyspark.sql.functions as f
import pyspark.sql.types as t
from datetime import datetime
from datetime import timedelta

l1 = [(5748517.0,'1960-01-01', 20574), (5748517.0,'1960-01-01', 20574), (5748517.0,'1960-01-01', 20574)]
df = spark.createDataFrame(l1).toDF('cic_id','sas_date','arrival_date')
df.show()
+---------+----------+------------+
|   cic_id|  sas_date|arrival_date|
+---------+----------+------------+
|5748517.0|1960-01-01|       20574|
|5748517.0|1960-01-01|       20574|
|5748517.0|1960-01-01|       20574|
+---------+----------+------------+

Now, there are two ways to achive functionality.

  1. UDF Way :
def date_add_(date, days):

    # Type check and convert to datetime object
    # Format and other things should be handle more delicately
    if type(date) is not datetime:
        date = datetime.strptime('1960-01-01', "%Y-%m-%d")
    return date + timedelta(days)


date_add_udf = f.udf(date_add_, t.DateType())

df.withColumn('actual_arrival_date', date_add_udf(f.to_date('sas_date'), 'arrival_date')).show()
+---------+----------+------------+-------------------+
|   cic_id|  sas_date|arrival_date|actual_arrival_date|
+---------+----------+------------+-------------------+
|5748517.0|1960-01-01|       20574|         2016-04-30|
|5748517.0|1960-01-01|       20574|         2016-04-30|
|5748517.0|1960-01-01|       20574|         2016-04-30|
+---------+----------+------------+-------------------+

  1. Using expr evaluation :
df.withColumn('new_arrival_date', f.expr("date_add(sas_date, arrival_date)")).show()
+---------+----------+------------+----------------+
|   cic_id|  sas_date|arrival_date|new_arrival_date|
+---------+----------+------------+----------------+
|5748517.0|1960-01-01|       20574|      2016-04-30|
|5748517.0|1960-01-01|       20574|      2016-04-30|
|5748517.0|1960-01-01|       20574|      2016-04-30|
+---------+----------+------------+----------------+