0
votes

Is there a better way to accomplish my goals with this code below:

  1. I want to read data based on filters from Dataset A which contains a lot of small fragments (there are many files in this dataset because I download data frequently)
  2. I want to consolidate the fragments based on the partition in a loop (I am using this when I cannot fit all of the filters into memory, so I process them one by one)
  3. I want to write this data to a new dataset (Dataset B) into a consolidated file which gets read by our BI tool - unfortunately there is no partition_filename_cb function so I need to use the legacy write_to_dataset for this - this file is generally the name of the partition
  4. I would really like to clean up Dataset A. Overtime, more and more files get added to the partitions since I am downloading data frequently and rows can be updated (some of these fragment files only have 1 or two records)

Below is my current process. I use a ds.Scanner to apply my filters and select my columns from an original dataset

def retrieve_fragments(dataset, filter_expression, columns):
    """Creates a dictionary of file fragments and filters from a pyarrow dataset"""
    fragment_partitions = {}
    scanner = ds.Scanner.from_dataset(dataset, columns=columns, filter=filter_expression)
    fragments = scanner.get_fragments()
    for frag in fragments:
        keys = ds._get_partition_keys(frag.partition_expression)
        fragment_partitions[frag] = keys
    return fragment_partitions

Below I create small lists of all of the fragments that have the same filter expression. I can then write these to a new dataset into a consolidated file, and I assume that I can also delete the individual fragment files and write a new consolidated version as well?

fragments = retrieve_fragments(
    dataset=dataset, filter_expression=filter_expression, columns=read_columns
)
unique_filters = []
dfs = []
for fragment, filter_value in fragments.items():
    if filter_value not in unique_filters:
        unique_filters.append(filter_value)


#each chunk is a list of all of the fragments with the same partition_expression / filter which we turn into a new dataset that we can then process or resave into a consolidated file
for unique_filter in unique_filters:
    chunks = []
    for frag, filter_value in fragments.items():
        if filter_value == unique_filter:
            chunks.append(frag.path)
    logging.info(
        f"Combining {len(chunks)} fragments with filter {unique_filter} into a single table"
    )
    table = ds.dataset(chunks, partitioning=partitioning, filesystem=filesystem).to_table(columns=read_columns)


    #ignoring metadata due to some issues with columns having a boolean type even though they were never boolean
    df = table.to_pandas(ignore_metadata=True)
    #this function would just sort and drop duplicates on a unique constraint key
    df = prepare_dataframe(df)
    table = pa.Table.from_pandas(df=df, schema=dataset.schema, preserve_index=False)

    #write dataset to Dataset B (using partition_filename_cb)
    #I believe I could now also write the dataset back to Dataset A in a consolidated parquet file and then delete all of the fragment.paths. This would leave me with only a single file in the partition "folder"


The output of this would save a single file per partition into a new dataset (/dev/interactions-final/created_date=2019-11-13/2019-11-13.parquet)

INFO - Combining 78 fragments with filter {'created_date': datetime.date(2019, 11, 13)} into a single table
INFO - Saving 172657 rows and 36 columns (70.36 MB to dev/interactions-final)
INFO - Combining 57 fragments with filter {'created_date': datetime.date(2019, 11, 18)} into a single table
INFO - Saving 67036 rows and 36 columns (29.63 MB to dev/interactions-final)
INFO - Combining 55 fragments with filter {'created_date': datetime.date(2019, 11, 19)} into a single table
INFO - Saving 65035 rows and 36 columns (29.62 MB to dev/interactions-final)
INFO - Combining 63 fragments with filter {'created_date': datetime.date(2019, 11, 20)} into a single table
INFO - Saving 63613 rows and 36 columns (30.76 MB to dev/interactions-final)
1

1 Answers

0
votes

Have you tried write_dataset (code here)? It will repartition and I think it collects small fragments in the process.