Is there a better way to accomplish my goals with this code below:
- I want to read data based on filters from Dataset A which contains a lot of small fragments (there are many files in this dataset because I download data frequently)
- I want to consolidate the fragments based on the partition in a loop (I am using this when I cannot fit all of the filters into memory, so I process them one by one)
- I want to write this data to a new dataset (Dataset B) into a consolidated file which gets read by our BI tool - unfortunately there is no partition_filename_cb function so I need to use the legacy write_to_dataset for this - this file is generally the name of the partition
- I would really like to clean up Dataset A. Overtime, more and more files get added to the partitions since I am downloading data frequently and rows can be updated (some of these fragment files only have 1 or two records)
Below is my current process. I use a ds.Scanner to apply my filters and select my columns from an original dataset
def retrieve_fragments(dataset, filter_expression, columns):
"""Creates a dictionary of file fragments and filters from a pyarrow dataset"""
fragment_partitions = {}
scanner = ds.Scanner.from_dataset(dataset, columns=columns, filter=filter_expression)
fragments = scanner.get_fragments()
for frag in fragments:
keys = ds._get_partition_keys(frag.partition_expression)
fragment_partitions[frag] = keys
return fragment_partitions
Below I create small lists of all of the fragments that have the same filter expression. I can then write these to a new dataset into a consolidated file, and I assume that I can also delete the individual fragment files and write a new consolidated version as well?
fragments = retrieve_fragments(
dataset=dataset, filter_expression=filter_expression, columns=read_columns
)
unique_filters = []
dfs = []
for fragment, filter_value in fragments.items():
if filter_value not in unique_filters:
unique_filters.append(filter_value)
#each chunk is a list of all of the fragments with the same partition_expression / filter which we turn into a new dataset that we can then process or resave into a consolidated file
for unique_filter in unique_filters:
chunks = []
for frag, filter_value in fragments.items():
if filter_value == unique_filter:
chunks.append(frag.path)
logging.info(
f"Combining {len(chunks)} fragments with filter {unique_filter} into a single table"
)
table = ds.dataset(chunks, partitioning=partitioning, filesystem=filesystem).to_table(columns=read_columns)
#ignoring metadata due to some issues with columns having a boolean type even though they were never boolean
df = table.to_pandas(ignore_metadata=True)
#this function would just sort and drop duplicates on a unique constraint key
df = prepare_dataframe(df)
table = pa.Table.from_pandas(df=df, schema=dataset.schema, preserve_index=False)
#write dataset to Dataset B (using partition_filename_cb)
#I believe I could now also write the dataset back to Dataset A in a consolidated parquet file and then delete all of the fragment.paths. This would leave me with only a single file in the partition "folder"
The output of this would save a single file per partition into a new dataset (/dev/interactions-final/created_date=2019-11-13/2019-11-13.parquet)
INFO - Combining 78 fragments with filter {'created_date': datetime.date(2019, 11, 13)} into a single table
INFO - Saving 172657 rows and 36 columns (70.36 MB to dev/interactions-final)
INFO - Combining 57 fragments with filter {'created_date': datetime.date(2019, 11, 18)} into a single table
INFO - Saving 67036 rows and 36 columns (29.63 MB to dev/interactions-final)
INFO - Combining 55 fragments with filter {'created_date': datetime.date(2019, 11, 19)} into a single table
INFO - Saving 65035 rows and 36 columns (29.62 MB to dev/interactions-final)
INFO - Combining 63 fragments with filter {'created_date': datetime.date(2019, 11, 20)} into a single table
INFO - Saving 63613 rows and 36 columns (30.76 MB to dev/interactions-final)