11
votes

My dataframes contains one field which is a date and it appears in the string format, as example

'2015-07-02T11:22:21.050Z'

I need to filter the DataFrame on the date to get only the records in the last week. So, I was trying a map approach where I transformed the string dates to datetime objects with strptime:

def map_to_datetime(row):
     format_string = '%Y-%m-%dT%H:%M:%S.%fZ'
     row.date = datetime.strptime(row.date, format_string)

df = df.map(map_to_datetime)

and then I would apply a filter as

df.filter(lambda row:
    row.date >= (datetime.today() - timedelta(days=7)))

I manage to get the mapping working but the filter fails with

TypeError: condition should be string or Column

Is there a way to use a filtering in a way that works or should I change the approach and how?

2

2 Answers

14
votes

I figured out a way to solve my problem by using the SparkSQL API with dates in String format.

Here is an example:

last_week = (datetime.today() - timedelta(days=7)).strftime(format='%Y-%m-%d')

new_df = df.where(df.date >= last_week)
10
votes

Spark >= 1.5

You can use INTERVAL

from pyspark.sql.functions import expr, current_date

df_casted.where(col("dt") >= current_date() - expr("INTERVAL 7 days"))

Spark < 1.5

You can solve this without using worker side Python code and switching to RDDs. First of all, since you use ISO 8601 string, your data can be directly casted to date or timestamp:

from pyspark.sql.functions import col

df = sc.parallelize([
    ('2015-07-02T11:22:21.050Z', ),
    ('2016-03-20T21:00:00.000Z', )
]).toDF(("d_str", ))

df_casted = df.select("*",
    col("d_str").cast("date").alias("dt"), 
    col("d_str").cast("timestamp").alias("ts"))

This will save one roundtrip between JVM and Python. There are also a few way you can approach the second part. Date only:

from pyspark.sql.functions import current_date, datediff, unix_timestamp

df_casted.where(datediff(current_date(), col("dt")) < 7)

Timestamps:

def days(i: int) -> int:
    return 60 * 60 * 24 * i

df_casted.where(unix_timestamp() - col("ts").cast("long") < days(7))

You can also take a look at current_timestamp and date_sub

Note: I would avoid using DataFrame.map. It is better to use DataFrame.rdd.map instead. It will save you some work when switching to 2.0+