3
votes

I am trying to load data from Delta into a pyspark dataframe.

path_to_data = 's3://mybucket/daily_data/'
df = spark.read.format("delta").load(path_to_data)

Now the underlying data is partitioned by date as

s3://mybucket/daily_data/
    dt=2020-06-12
    dt=2020-06-13
    ...
    dt=2020-06-22

Is there a way to optimize the read as Dataframe, given:

  1. Only certain date range is needed
  2. Subset of column is only needed

Current way, i tried is :

df.registerTempTable("my_table")
new_df = spark.sql("select col1,col2 from my_table where dt_col > '2020-06-20' ")
# dt_col is column in dataframe of timestamp dtype.

In the above state, does Spark need to load the whole data, filter the data based on date range and then filter columns needed ? Is there any optimization that can be done in pyspark read, to load data since it is already partitioned ?

Something on line of :

df = spark.read.format("delta").load(path_to_data,cols_to_read=['col1','col2'])
or 
df = spark.read.format("delta").load(path_to_data,partitions=[...])
1
if you call .explain on your query, the resulting query plan should mention a PartitionFilter that references your dt column. Can you add this query plan to your post?dispanser

1 Answers

2
votes

In your case, there is no extra step needed. The optimizations would be taken care by Spark. Since you already partitioned the dataset based on column dt when you try to query the dataset with partitioned column dt as filter condition. Spark load only the subset of the data from the source dataset which matches the filter condition, in your case it is dt > '2020-06-20'.

Spark internally does the optimization based partitioning pruning.