1
votes

As the title states, I would like to repartition a pyarrow table by size (or row group size) by use of pyarrow and writing into several parquet files.

I have had a look to pyarrow documentation, and identified the partitioned dataset chapter which may seem to be a direction. Unfortunately, it shows that partitioning by column content is possible, but not by size (or row group size).

So, starting from one table, how can I control the writing step so that several files are written with controlled size x MB? (or row group size)

import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq

file = 'example.parquet'
file_res = 'example_res'

# Generate a random df
df = pd.DataFrame(np.random.randint(100,size=(100000, 20)),columns=['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T'])
table = pa.Table.from_pandas(df)

# With this command, I can write a single parquet file that contains 2 row groups.
pq.write_table(table, file, version='2.0', row_group_size=50000)

# I can read it back and try to write it as a partitioned dataset, but a single parquet file is then written.
table_new = pq.ParquetFile(file).read()
pq.write_to_dataset(table_new, file_res)

Thanks for any help! Bests,

1
Do you want to have one file for each group of 50000 rows? - 0x26res
Hi @0x26res, yes, this is the idea. I would like to have them as a dataset, so that I can read lazily with pyarrow, but in several files. - pierre_j
You might want to keep an eye on issues.apache.org/jira/browse/ARROW-10439 - Pace
ah ah, it seems to be exactly what I am looking for indeed :) - pierre_j

1 Answers

3
votes

Looking at the doc for write_to_dataset and ParquetWriter, I can't think of anything obvious.

But you could assign a bucket to each row and partition your data based on the bucket, for example:

df = (
    pd.DataFrame(np.random.randint(100,size=(100000, 20)),columns=['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T'])
    .assign(bucket=lambda x: x.index // 5000)
)
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table_new, file_res, partition_cols=['bucket'])

And you'll get the follwoing file structure:

bucket=0
bucket=1
bucket=10
bucket=11
bucket=12
bucket=13
bucket=14
bucket=15
bucket=16
bucket=17
bucket=18
bucket=19
bucket=2
bucket=3
bucket=4
bucket=5
bucket=6
bucket=7
bucket=8
bucket=9

This is assuming your df.index starts at zero and increase one by one (0, 1, 2, 3...)