I'm currently using Dask in the following way...
- there are a list of files on S3 in the following format:
<day1>/filetype1.gz
<day1>/filetype2.gz
<day2>/filetype1.gz
<day2>/filetype2.gz
...etc
my code reads all files of
filetype1
and builds up a dataframe and sets the index (e.g:df1 = ddf.read_csv(day1/filetype1.gz, blocksize=None, compression='gzip').set_index(index_col)
)reads through all files of
filetype2
and builds up a big dataframe (similar to above).merges the two dataframes together via
merged_df = ddf.merge(df1, df2, how='inner', left_index=True, right_index=True)
.Writes the results out to S3 via:
merged_df.to_csv(<s3_output_location>)
Note: The goal here really is to merge within a particular day (that is, merge filetype1 and filetype2 for a given day), repeat for every day, and store the union of all those joins, but it seemed like doing the join one day at a time would not leverage parallelism, and that letting Dask manage a larger join would be more performant. I thought Dask would manage the larger join in a memory-aware way based on the following line from the docs(https://docs.dask.org/en/latest/dataframe-joins.html):
If enough memory can not be found then Dask will have to read and write data to disk, which may cause other performance costs.
I see that a MemoryError
happens in the call to to_csv
. I'm guessing this is because to_csv
calls compute
, which tries to compute the full result of the join, then tries to store that result. The full file contents certainly cannot fit in memory, but I thought (hoped) that Dask would run the computations and store the resulting Dataframe in a memory-aware way. Any guidance or suggestions on how I should be using Dask to accomplish what I am trying to do? Thanks in advance.