2
votes

I am having some problems with the speed of loading .parquet files. However, I don't know what I am doing wrong.

Problem

I am trying to read a single .parquet file from from my local filesystem which is the partitioned output from a spark job. Such that there are .parquet files in hierarchical directories named a=x and b=y.

To achieve this, I am using pandas.read_parquet (which uses pyarrow.parquet.read_table) for which I include the filters kwarg. The run time of using the filters is way longer than I would expect.

# The following runs for about 55 seconds
pd.read_parquet(<path_to_entire_dataset>, filters=[[('a', '=', 'x'), ('b', '=', 'y')]])

# The following runs for about 0.04 seconds
pd.read_parquet(<path_to_entire_dataset>/a=x/b=y/)

# The following runs for about 70 seconds
pd.read_parquet(<path_to_entire_dataset>)

Reading a single parquet file by specifying filters is only slightly faster than loading the entire dataset, where I would expect a run time approximately linear in the amount of files.

What mistake do I make here?

I realize that simply putting the filters in the path would work, however this will quickly become complex as what I want to filter on will / can change. Besides, I think read_table should be able to load this data efficiently.

PS: The entire dataset contains many millions of rows, the data I want to load is only a few thousand rows.

Edit 1:

As suggested by 0x26res I manually defined the partitioning, this lead to a significant speed up, but still not as much as I would have expected. In this situation the run time was about 5 seconds.

partitioning = HivePartitioning(
    pa.schema([
        pa.field('a', pa.string()),
        pa.field('b', pa.int32()),
    ])
)

pd.read_parquet(
    <path_to_entire_dataset>,
    engine='pyarrow',
    filters=[
        [
            ('a', '=', x),
            ('b', '=', y),
        ]
    ],
    partitioning=partitioning
)
1
I made an earlier comment but I think I read your question wrong. Is your dataset partitioned by spark so the directory names are formatted like a=x and b=y?Pace
Yes, just in case it was not clear I updated the description.Jeroen Bos
Can you help me to understand the performance a bit more? How many files matched that filter? How large were these files? How large was the uncompressed table read by those files?Pace
There is exactly one file that matches that filter. There are about 100_000 files, each up to 100Kb. Uncompressed the table was around the 10Gb. Like I said in my question, I could of course specify the files I want in the path and potentially load the several files I need and combine them using pd.concat. However, I want to understand why this is not as fast as I expect it to be. I thought that was one of the major benefits of parquet, you load the data you need, not more.Jeroen Bos
is there any update or solution on this issue you can share? Would really appreciate it!nope

1 Answers

1
votes

Given the run time, I suspect arrow is opening every files and then filtering.

Maybe you can try specifiying the partitioning so arrow can be smarter about it:

import pyarrow as pa

partitioning = pa.dataset.HivePartitioning(
        pa.schema([
            pa.field('a', pa.string()),
            pa.field('b', pa.string())
        ])
    )

pd.read_parquet(<path_to_entire_dataset>, filters=[[('a', '=', 'x'), ('b', '=', 'y')]], partitioning=partitioning)