1
votes

I'm currently using Dask in the following way...

  • there are a list of files on S3 in the following format:

<day1>/filetype1.gz

<day1>/filetype2.gz

<day2>/filetype1.gz

<day2>/filetype2.gz

...etc

  • my code reads all files of filetype1 and builds up a dataframe and sets the index (e.g: df1 = ddf.read_csv(day1/filetype1.gz, blocksize=None, compression='gzip').set_index(index_col))

  • reads through all files of filetype2 and builds up a big dataframe (similar to above).

  • merges the two dataframes together via merged_df = ddf.merge(df1, df2, how='inner', left_index=True, right_index=True).

  • Writes the results out to S3 via: merged_df.to_csv(<s3_output_location>)

Note: The goal here really is to merge within a particular day (that is, merge filetype1 and filetype2 for a given day), repeat for every day, and store the union of all those joins, but it seemed like doing the join one day at a time would not leverage parallelism, and that letting Dask manage a larger join would be more performant. I thought Dask would manage the larger join in a memory-aware way based on the following line from the docs(https://docs.dask.org/en/latest/dataframe-joins.html):

If enough memory can not be found then Dask will have to read and write data to disk, which may cause other performance costs.

I see that a MemoryError happens in the call to to_csv. I'm guessing this is because to_csv calls compute, which tries to compute the full result of the join, then tries to store that result. The full file contents certainly cannot fit in memory, but I thought (hoped) that Dask would run the computations and store the resulting Dataframe in a memory-aware way. Any guidance or suggestions on how I should be using Dask to accomplish what I am trying to do? Thanks in advance.

1

1 Answers

0
votes

I see that a MemoryError happens in the call to to_csv. I'm guessing this is because to_csv calls compute, which tries to compute the full result of the join, then tries to store that result. The full file contents certainly cannot fit in memory, but I thought (hoped) that Dask would run the computations and store the resulting Dataframe in a memory-aware way

In general Dask does chunk things up and operate in the way that you expect. Doing distributed joins in a low-memory way is hard, but generally doable. I don't know how to help more here without more information, which I appreciate is hard to deliver concisely on Stack Overflow. My usual recommendation is to watch the dashboard closely.

Note: The goal here really is to merge within a particular day (that is, merge filetype1 and filetype2 for a given day), repeat for every day, and store the union of all those joins, but it seemed like doing the join one day at a time would not leverage parallelism, and that letting Dask manage a larger join would be more performant

In general your intuition is correct that giving more work to Dask at once is good. However in this case it looks like you know something about your data that Dask doesn't know. You know that each file only interacts with one other file. In general joins have to be done in a way where all rows of one dataset may interact with all rows of the other, and so Dask's algorithms have to be pretty general here, which can be expensive.

In your situation I would use Pandas along with Dask delayed to do all of your computation at once.

lazy_results = []
for fn in filenames:
    left = dask.delayed(pd.read_csv, fn + "type-1.csv.gz")
    right = dask.delayed(pd.read_csv, fn + "type-1.csv.gz")
    merged = left.merge(right)
    out = merged.to_csv(...)
    lazy_results.append(out)

dask.compute(*lazy_results)