0
votes

I am multi-processing my python script with apply_async() as below:

def my_proc(df, id):
   # do something
   return df

df = pd.read_csv(myfile, sep='\t', header=0, dtype=object)
p = multiprocessing.Pool(50)
ids = df['id'].tolist()
for i in range(len(ids))
    result[id] = p.apply_async(my_proc, [df, ids[i]])

The problem I am encountering is if dataframe size gets very large (200K rows with 75 columns), at any given time only one process runs while all others are blocked in sleep mode.

If I save the dataframe into a csv file and pass the csv filename as a parameter and have the processes open and read the csv, I see that now all processes stay running but the performance becomes unacceptable as all processes (50 of them) compete to open the same large csv file.

Anyone can tell me how can I find out why and where are these processes blocked. Any suggestions for an alternative performant workaround?

Edits:

I am using a Linux server. I tried to pass the df in a queue like below, but same result. I also return a None and changed my process count to 3 to isolate the problem:

def my_proc(q, id):
    df = q.get()
    # do something
    return **None**

p = multiprocessing.Pool(**3**)
m = multiprocessing.Manager()
q = m.Queue()
df = pd.read_csv(report_file_dups, sep='\t', header=0, dtype=object)
q.put(df)
ids = df['id'].tolist()
for i in range(len(ids))
    result[id] = p.apply_async(my_proc, [q, ids[i]])

Am I using the queue as it was intended?

1
Are you on Windows? if you are you need to put your main code in a if __name__=="__main__": block. Otherwise multiprocessing won't work correctly. - Dan D.
have you tried using map with a chunksize set? looks a bit like a question I responded to recently: stackoverflow.com/a/53797655/1358308 - Sam Mason
it might also just be related to sending so much data around the place. the parameters for each call get "pickled" independently, and the results are picked before sending back. i.e. you probably want to do something so that you're not pickling a large dataframe with every call - Sam Mason

1 Answers

0
votes

What about feeding the file into a queue, reading it line by line, and having workers consume data from the queue?