0
votes

I have a large input csv file (several GBs) that I import in Dask with a blocksize of 5e6. The input csv contains two columns: "ID" and "Text".

ddf1 = dd.read_csv('REL_Input.csv', names=['ID', 'Text'], blocksize=5e6)

I need to add a third column to ddf1, "Hash", by parsing the existing "Text" column for a string between "Hash=" and ";". In Pandas, I can simply do this:

ddf1['Hash'] = ddf1['Text'].str.extract(r'Hash=(.*?);')

When I do this in Dask, I get an error saying that the "column assignment doesn't support dask.dataframe.core.DataFrame". I tried to use assign but had no luck.

I also need to read multiple large csv files (each several GBs in size) from a directory, concatenate them into another Dask dataframe, ddf2. Each of these csv files have 100s of columns but I only need 2: "Hash" and "Name". Here is the code to create ddf2:

ddf2 = dd.concat([dd.read_csv(f, usecols=['Hash', 'Name'], blocksize=5e6) for f in glob.glob('*.tsv')], ignore_index=True, axis=0)

Then, I need to merge the two dataframes on the "Hash" columns--something like this:

ddf3 = ddf1[['ID', 'ddf1_Hash']].merge(ddf2[['ddf2_Hash', 'Name']], left_on='ddf1_Hash', right_on='ddf2_Hash', how='left') 

Finally, I need to export ddf3 as a csv:

df3.to_csv('Output.csv')

I looked and it seems I can create the column for ddf1 and perform the merge operation by changing both ddf1 and ddf2 to pandas dfs using compute. However, that's not an option for me due to the sheer size of these dataframes. I also tried using the chunks approach in Pandas, but that does not work due to the "out of memory" error.

Is there a good way to tackle this problem? I'm still learning Python so any help would be appreciated.

UPDATE:

I am able to create the third column and merge the two dataframes. Though, now the issue is that I can't export the merged dataframe as a csv.

1

1 Answers

0
votes
  1. Running regex on a string column. The following snippet uses assign:
import dask.dataframe as dd
import pandas as pd

# this step is just to setup a minimum reproducible example
df = pd.DataFrame(list("abcdefghi"), columns=['A'])
ddf = dd.from_pandas(df, npartitions=3)

# this uses assign to extract the relevant content
ddf = ddf.assign(check_c = lambda x: x['A'].str.extract(r'([a-z])'))

# you can see that the computation was done correctly
ddf.compute()
  1. Concatenating csv files. Do csv files have the same structure/columns? If so, you can just use dd.read_csv("path_to_csv_files/*csv"), but if the files have different structures, then your approach is correct:
ddf2 = dd.concat([dd.read_csv(f, usecols=['Hash', 'Name'], blocksize=5e6) for f in glob.glob('*.tsv')], ignore_index=True, axis=0)
  1. Merging the dataframes. This is going to be an expensive operation, here's a couple of options to potentially reduce the cost of this:
  • if any of the dataframes can be put into memory, then it would help to run .compute() to get pandas dataframe before the merge;
  • setting the key variable as index on one or both dataframes:
ddf1 = ddf1.set_index('Hash')
ddf2 = ddf2.set_index('Hash')
ddf3 = ddf1.merge(ddf2, left_index=True, right_index=True)
  1. Saving csv, by default, dask will save each partition to its own csv file, so your path needs to contain an asterisk, e.g.:
df3.to_csv('Output_*.csv', index=False)

There are other options possible (explicit paths, custom name function, see https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.to_csv).

If you need a single file, you can use

df3.to_csv('Output.csv', index=False, single_file=True)

However, this option is not supported on all systems, so you might want to check that it works using a small sample first (see documentation).