To summarize: How to perform groupby operations in parallel for a limited number of groups, but writing the result of each group apply function to disk?
My problem: I'm trying to create a supervised structure for regression models from information of a lot of clients separated into years. From the same clients I have to build different models, with different inputs X and labels Y, thus my idea is to create a single X and Y dataframe holding all variables at once, and slicing each one according to the task. For example, X could hold information from the salary, age or sex, but model 1 would use only age and sex, while model 2 only use salary.
As clients are not present every year, I can only use clients that are present from one period to the next one. Instead of selecting the intersection of clients for each pair of contigous years, I'm trying to concatenate the whole information and performing groupby operations by client ID (and then filtering by year sequence, for example using the rows where the difference of periods are 1). The problem of using Dask for this task is that distributed workers are running low on memory (even after increasing the limit to 30Gb each). Note that for each group I'm creating a new dataframe, so I'm not reducing calculation to a single number per group, thus the memory intensive operation.
What I'm currently doing is performing a groupby operation, then iterating over the groupby object and writing to disk sequentially: for example like:
x_file=open('X.csv', 'w')
for name, group in concatenated_data.groupby('ID'):
data_x=my_func(group) # In my real code, my_func returns x and y dataframes
data_x.to_csv(x_file, header=None)
x_file.close()
which write the data sequentially applying my_func which selects the x and y for each group.
What I want is to perform the operation for a controlled number of groups (lets say 3 at the time), and writing the result of each group to disk (maybe with data_x.to_csv(x_file, single_file=True)).
Of course I can do the same for a dask dataframe, and iterate over the groupbpy object using get_group(), but I don't believe it will run in parallel while also keeping the memory on check.
EDIT: Example
# Lets say I have 3 csv files:
data=['./data_2016', './data_2017', './data_2018'] # Each file contains millions of rows (1 per client ID) and like 85 columns
# and certains variables
x_vars=['x1', 'x2', 'x3'] # x variables
y_vars= ['y1', 'y2', 'x1'] # note than some variables can be among x and y (like using today's salary to predict tomorrows salary)
data=[pd.read_csv(x) for x in data]
def func1(df_):
# do some preprocessing stuff
return df_
data=map(func1, data) # Some preprocessing and adding some columns (for example adding column for year)
concatenated_data=pd.concat(data, axis=1) # Big file, all clients from 2016-2018
def my_func(df_): # function applied above
# order by year
df_['Diff']=df_.year.diff() # calculating the difference among years
df['shifted']=df.Diff.shift(-1) # calculate shift of difference
# For exammple, *client z* may be on 2016 and 2018, thus his year difference is 2.
# I can't use *clien z* x_vars to predict y (only a single period ahead regression)
x=df_.loc[df_['shifted']==1, x_vars] # select only contigous years
y=df_.loc[df_['Diff']==1, y_vars] # the same, but a year ahead of x
return (x, y)
# ... Iteration over groupby object
Instead of using groupby() to reduce, I'm expanding the single, big file into an x and y dataframes, on which y holds information a period ahead of x.
As you can see, using a dask dataframe groupby (omitted for simplicity) would parallelize my_func operation, but as I understand would also wait until all operations nodes are completed, thus depleting my memory. What I would like is to perform my_func for certain groups (ideally as most as memory could hold), finish them, save to disk (without problems related to paralell saving) and finally proceed to the next batch of groups.
Maybe I can use some dask delayed objects, but I don't think it will make good use of my memory if a set the batches manually.
df.head(10).to_dict()? Then if you go for dask you can have some problems with shift given that your data will be scattered in partitions. - rpanai