5
votes

Suppose I have a DataFrame called transactions with the following integer columns: year, month, day, timestamp, transaction_id.

In [1]: transactions = ctx.createDataFrame([(2017, 12, 1, 10000, 1), (2017, 12, 2, 10001, 2), (2017, 12, 3, 10003, 3), (2017, 12, 4, 10004, 4), (2017, 12, 5, 10005, 5), (2017, 12, 6, 10006, 6)],('year', 'month', 'day', 'timestamp', 'transaction_id'))

In [2]: transactions.show()
+----+-----+---+---------+--------------+
|year|month|day|timestamp|transaction_id|
+----+-----+---+---------+--------------+
|2017|   12|  1|    10000|             1|
|2017|   12|  2|    10001|             2|
|2017|   12|  3|    10003|             3|
|2017|   12|  4|    10004|             4|
|2017|   12|  5|    10005|             5|
|2017|   12|  6|    10006|             6|
+----+-----+---+---------+--------------+

I want to define a function filter_date_range that returns a DataFrame consisting of transaction rows that fall within some date-range.

>>> filter_date_range(  
        df = transactions, 
        start_date = datetime.date(2017, 12, 2), 
        end_date = datetime.date(2017, 12, 4)).show()

+----+-----+---+---------+--------------+
|year|month|day|timestamp|transaction_id|
+----+-----+---+---------+--------------+
|2017|   12|  1|    10001|             2|
|2017|   12|  1|    10003|             3|
|2017|   12|  1|    10004|             4|
+----+-----+---+---------+--------------+

Assuming the data is held in Hive partitions, partitioned by year, month, day, what is the most efficient way to perform a filter like this that involves date arithmetic? I am looking for a way of doing this in a purely DataFrame-ic way, without resorting to usage of transactions.rdd, so that Spark can infer that only a subset of partitions actually need to be read.

1
Some sample data would be useful. Make it easy for us to cut-and-paste to recreate your dataframe. How to make good reproducible Apache Spark Dataframe examples.pault
@pault updated with sample dataandrew

1 Answers

2
votes

If data is partitioned like this:

.
├── _SUCCESS
└── year=2017
    └── month=12
        ├── day=1
        │   └── part-0...parquet
        ├── day=2
        │   └── part-0...parquet
        ├── day=3
        │   └── part-0...parquet
        ├── day=4
        │   └── part-0...parquet
        ├── day=5
        │   └── part-0...parquet
        └── day=6
            └── part-0...parquet

you could just generate a list of directories to load:

start_date = datetime.date(2017, 12, 2)
end_date = datetime.date(2017, 12, 4)
n = (end_date - start_date).days + 1

base_path = ...

paths = [
    "{}/year={}/month={}/day={}".format(base_path, d.year, d.month, d.day) 
    for d in [start_date + datetime.timedelta(days=i) for i in  range(n)]
]

spark.read.option("basePath", base_path).load(paths).explain()

# == Parsed Logical Plan ==
# Relation[timestamp#47L,transaction_id#48L,year#49,month#50,day#51] parquet
# 
# == Analyzed Logical Plan ==
# timestamp: bigint, transaction_id: bigint, year: int, month: int, day: int
# Relation[timestamp#47L,transaction_id#48L,year#49,month#50,day#51] parquet
# 
# == Optimized Logical Plan ==
# Relation[timestamp#47L,transaction_id#48L,year#49,month#50,day#51] parquet
# 
# == Physical Plan ==
# *FileScan parquet [timestamp#47L,transaction_id#48L,year#49,month#50,day#51] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/transactions/year=2017/month=12/day=2, file:/user/hiv..., PartitionCount: 3, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<timestamp:bigint,transaction_id:bigint>

Reference: