1
votes

I have a very large dataframe that I am resampling a large number of times, so I'd like to use dask to speed up the process. However, I'm running into challenges with the groupby apply. An example data frame would be

import numpy as np
import pandas as pd
import random
test_df = pd.DataFrame({'sample_id':np.array(['a', 'b', 'c', 'd']).repeat(100),
                       'param1':random.sample(range(1, 1000), 400)})
test_df.set_index('sample_id', inplace=True)

which I can normally groupby and resample using

N = 5;i=1

test = test_df\
    .groupby(['sample_id'])\
    .apply(pd.DataFrame.sample, n=N, replace=False)\
    .reset_index(drop=True)
test['bootstrap'] = i
test['resample'] = N

Which I wrap into a method that iterates over an N gradient i times. The actual dataframe is very large with a number of columns, and before anyone suggests, this method is a little bit faster than an np.random.choice approach on the index-- it's all in the groupby. I've run the overall procedure through a multiprocessing method, but I wanted to see if I could get a bit more speed out of a dask version of the same. The problem is the documentation suggests that if you index and partition then you get complete groups per partition-- which is not proving true.

import dask.dataframe as dd

df1 = dd.from_pandas(test_df, npartitions=8)
df1=df1.persist()
df1.divisions

creates

('a', 'b', 'c', 'd', 'd')

which unsurprisingly results in a failure

N = 5;i=1

test = df1\
    .groupby(['sample_id'])\
    .apply(pd.DataFrame.sample, n=N, replace=False)\
    .reset_index(drop=True)
test['bootstrap'] = i
test['resample'] = N

ValueError: Metadata inference failed in groupby.apply(sample). You have supplied a custom function and Dask is unable to determine the type of output that that function returns. To resolve this please provide a meta= keyword. The docstring of the Dask function you ran should have more information. Original error is below: ValueError("Cannot take a larger sample than population when 'replace=False'")

I have dug all around the documentation on keywords, dask dataframes & partitions, and groupby aggregations and simply am simply missing the solution if it's there in the documents. Any advice on how to create a smarter set of partitions and/or get the groupby with sample playing nice with dask would be deeply appreciated.

1
HI Danielle, does my answer help you?rpanai

1 Answers

3
votes

It's not quite clear to me what you are trying to achieve and why you need to add replace=False (which is default) but the following code work for me. I just need to add meta.

import dask.dataframe as dd

df1 = dd.from_pandas(test_df.reset_index(), npartitions=8)

N = 5
i = 1

test = df1\
    .groupby(['sample_id'])\
    .apply(lambda x: x.sample(n=N),
           meta={"sample_id": "object",
                 "param1": "f8"})\
    .reset_index(drop=True)
test['bootstrap'] = i
test['resample'] = N

If you then want to drop sample_id you just need to add

df = df.drop("sample_id", axis=1)