0
votes

To summarize: How to perform groupby operations in parallel for a limited number of groups, but writing the result of each group apply function to disk?

My problem: I'm trying to create a supervised structure for regression models from information of a lot of clients separated into years. From the same clients I have to build different models, with different inputs X and labels Y, thus my idea is to create a single X and Y dataframe holding all variables at once, and slicing each one according to the task. For example, X could hold information from the salary, age or sex, but model 1 would use only age and sex, while model 2 only use salary.

As clients are not present every year, I can only use clients that are present from one period to the next one. Instead of selecting the intersection of clients for each pair of contigous years, I'm trying to concatenate the whole information and performing groupby operations by client ID (and then filtering by year sequence, for example using the rows where the difference of periods are 1). The problem of using Dask for this task is that distributed workers are running low on memory (even after increasing the limit to 30Gb each). Note that for each group I'm creating a new dataframe, so I'm not reducing calculation to a single number per group, thus the memory intensive operation.

What I'm currently doing is performing a groupby operation, then iterating over the groupby object and writing to disk sequentially: for example like:

x_file=open('X.csv', 'w')
for name, group in concatenated_data.groupby('ID'):
   data_x=my_func(group) # In my real code, my_func returns x and y dataframes
   data_x.to_csv(x_file, header=None)
x_file.close()

which write the data sequentially applying my_func which selects the x and y for each group.
What I want is to perform the operation for a controlled number of groups (lets say 3 at the time), and writing the result of each group to disk (maybe with data_x.to_csv(x_file, single_file=True)).
Of course I can do the same for a dask dataframe, and iterate over the groupbpy object using get_group(), but I don't believe it will run in parallel while also keeping the memory on check.

EDIT: Example

# Lets say I have 3 csv files:
data=['./data_2016', './data_2017', './data_2018'] # Each file contains millions of rows (1 per client ID) and like 85 columns
# and certains variables
x_vars=['x1', 'x2', 'x3'] # x variables
y_vars= ['y1', 'y2', 'x1'] # note than some variables can be among x and y (like using today's salary to predict tomorrows salary)
data=[pd.read_csv(x) for x in data]

def func1(df_):
   # do some preprocessing stuff
   return df_
data=map(func1, data) # Some preprocessing and adding some columns (for example adding column for year)

concatenated_data=pd.concat(data, axis=1) # Big file, all clients from 2016-2018

def my_func(df_): # function applied above
   # order by year
   df_['Diff']=df_.year.diff() # calculating the difference among years 
   df['shifted']=df.Diff.shift(-1) # calculate shift of difference
   # For exammple, *client z* may be on 2016 and 2018, thus his year difference is 2. 
   # I can't use *clien z* x_vars to predict y (only a single period ahead regression)
   x=df_.loc[df_['shifted']==1, x_vars] # select only contigous years
   y=df_.loc[df_['Diff']==1, y_vars] # the same, but a year ahead of x
   return (x, y)

# ... Iteration over groupby object

Instead of using groupby() to reduce, I'm expanding the single, big file into an x and y dataframes, on which y holds information a period ahead of x.
As you can see, using a dask dataframe groupby (omitted for simplicity) would parallelize my_func operation, but as I understand would also wait until all operations nodes are completed, thus depleting my memory. What I would like is to perform my_func for certain groups (ideally as most as memory could hold), finish them, save to disk (without problems related to paralell saving) and finally proceed to the next batch of groups.

Maybe I can use some dask delayed objects, but I don't think it will make good use of my memory if a set the batches manually.

1
Do you mind to provide a mcve? - rpanai
Here you are overwriting your input file, - rpanai
Please wait me a while. I cant post an example right now. Pd: i missed the opening file. It should be x_file=open('X.csv','w'), sorry about that - TJ_93
Do you mind to add a sample of your dataframe? Possibly as df.head(10).to_dict()? Then if you go for dask you can have some problems with shift given that your data will be scattered in partitions. - rpanai
Thanks for your support @rpanai! Sorrily, the information is heavily restricted, that's why I have to use company servers' that are not so fast. I'll try to recreate a fake one though (please wait me a little bit again) - TJ_93

1 Answers

1
votes

I'm not sure if this is what you are looking for

Generate data

import pandas as pd
import numpy as np
import dask.dataframe as dd
import os

n = 200
df = pd.DataFrame({"grp":np.random.choice(list("abcd"), n),
                   "x":np.random.randn(n),
                   "y":np.random.randn(n),
                   "z":np.random.randn(n)})

df.to_csv("file.csv", index=False)

# we will need later on
df.to_parquet("file.parquet", index=False)

Pandas solution

# we save our files on a given folder
fldr = "output1"
os.makedirs(fldr, exist_ok=True)

# we read the columns we need only
cols2read = ["grp", "x", "y"]

df = pd.read_csv("file.csv")
df = df[cols2read]

def write_file(x, fldr):
    name = x["grp"].iloc[0]
    x.to_csv(f"{fldr}/{name}.csv", index=False)

df.groupby("grp")\
  .apply(lambda x: write_file(x, fldr))

Dask solution

This is basically the same but we need to add meta to our apply and the compute

# we save our files on a given folder
fldr = "output2"
os.makedirs(fldr, exist_ok=True)

# we read the columns we need only
cols2read = ["grp", "x", "y"]

df = pd.read_csv("file.csv")
df = df[cols2read]

def write_file(x, fldr):
    name = x["grp"].iloc[0]
    x.to_csv(f"{fldr}/{name}.csv", index=False)

df.groupby("grp")\
  .apply(lambda x: write_file(x, fldr), meta='f8')\
  .compute()

Working with parquet

Here I suggest you to work with parquet as it's going to be ways more efficient

cols2read = ["grp", "x", "y"]
df = dd.read_parquet("file.parquet",
                     columns=cols2read)

df.to_parquet("output3/",
              partition_on="grp")

Inside output3 you can find several folders called grp=a and so on. And each off them could eventually contain several files. but you can read all of them with pd.read_parquet("output3/grp=a)