36
votes

Here's the program:

#!/usr/bin/python

import multiprocessing

def dummy_func(r):
    pass

def worker():
    pass

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=16)
    for index in range(0,100000):
        pool.apply_async(worker, callback=dummy_func)

    # clean up
    pool.close()
    pool.join()

I found memory usage (both VIRT and RES) kept growing up till close()/join(), is there any solution to get rid of this? I tried maxtasksperchild with 2.7 but it didn't help either.

I have a more complicated program that calles apply_async() ~6M times, and at ~1.5M point I've already got 6G+ RES, to avoid all other factors, I simplified the program to above version.

EDIT:

Turned out this version works better, thanks for everyone's input:

#!/usr/bin/python

import multiprocessing

ready_list = []
def dummy_func(index):
    global ready_list
    ready_list.append(index)

def worker(index):
    return index

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=16)
    result = {}
    for index in range(0,1000000):
        result[index] = (pool.apply_async(worker, (index,), callback=dummy_func))
        for ready in ready_list:
            result[ready].wait()
            del result[ready]
        ready_list = []

    # clean up
    pool.close()
    pool.join()

I didn't put any lock there as I believe main process is single threaded (callback is more or less like a event-driven thing per docs I read).

I changed v1's index range to 1,000,000, same as v2 and did some tests - it's weird to me v2 is even ~10% faster than v1 (33s vs 37s), maybe v1 was doing too many internal list maintenance jobs. v2 is definitely a winner on memory usage, it never went over 300M (VIRT) and 50M (RES), while v1 used to be 370M/120M, the best was 330M/85M. All numbers were just 3~4 times testing, reference only.

5
Just speculating here, but queuing a million objects takes up space. Perhaps batching them will help. The docs are not definitive, but the example (search for Testing callback) shows apply_async result being waited on, even when there are callbacks. The wait may be needed to clear a result queue.tdelaney
So multiprocessing.pool may not be the right tool for me, as callback actually does not do cleanup jobs, is it possible to do cleanup in callback? The problem is that I cannot wait after apply_async() call as in real world worker() takes ~0.1 seconds per request (several HTTP requests).C.B.
Wild guess: apply_asynch creates an AsynchResult instance. The Pool probably has some reference to these objects, since they must be able to return the result when the computation has finished, but in your loop you are simply throwing them away. Probably you should call get() or wait() on the asynch results at some point, maybe using the callback argument of apply_asynch.Bakuriu
I think there's a race condition on the EDIT version when you overwrite ready_list. There's a thread which handles the results from the AsyncResults (docs.python.org/2/library/…) and that thread calls the callback. It may be faster simply because you are discarding results. Also, use time.sleep() with a small random delay to simulate work and sprinkle sleeps in your code to catch race conditions.Javier
maxtasksperchild seems to have fixed the memory leak caused by apply_async on 3.7.laido yagamii

5 Answers

22
votes

I had memory issues recently, since I was using multiple times the multiprocessing function, so it keep spawning processes, and leaving them in memory.

Here's the solution I'm using now:

def myParallelProcess(ahugearray):
    from multiprocessing import Pool
    from contextlib import closing
    with closing(Pool(15)) as p:
        res = p.imap_unordered(simple_matching, ahugearray, 100)
    return res
8
votes

Simply create the pool within your loop and close it at the end of the loop with pool.close().

7
votes

Use map_async instead of apply_async to avoid excessive memory usage.

For your first example, change the following two lines:

for index in range(0,100000):
    pool.apply_async(worker, callback=dummy_func)

to

pool.map_async(worker, range(100000), callback=dummy_func)

It will finish in a blink before you can see its memory usage in top. Change the list to a bigger one to see the difference. But note map_async will first convert the iterable you pass to it to a list to calculate its length if it doesn't have __len__ method. If you have an iterator of a huge number of elements, you can use itertools.islice to process them in smaller chunks.

I had a memory problem in a real-life program with much more data and finally found the culprit was apply_async.

P.S., in respect of memory usage, your two examples have no obvious difference.

6
votes

I have a very large 3d point cloud data set I'm processing. I tried using the multiprocessing module to speed up the processing, but I started getting out of memory errors. After some research and testing I determined that I was filling the queue of tasks to be processed much quicker than the subprocesses could empty it. I'm sure by chunking, or using map_async or something I could have adjusted the load, but I didn't want to make major changes to the surrounding logic.

The dumb solution I hit on is to check the pool._cache length intermittently, and if the cache is too large then wait for the queue to empty.

In my mainloop I already had a counter and a status ticker:

# Update status
count += 1
if count%10000 == 0:
    sys.stdout.write('.')
    if len(pool._cache) > 1e6:
        print "waiting for cache to clear..."
        last.wait() # Where last is assigned the latest ApplyResult

So every 10k insertion into the pool I check if there are more than 1 million operations queued (about 1G of memory used in the main process). When the queue is full I just wait for the last inserted job to finish.

Now my program can run for hours without running out of memory. The main process just pauses occasionally while the workers continue processing the data.

BTW the _cache member is documented the the multiprocessing module pool example:

#
# Check there are no outstanding tasks
#

assert not pool._cache, 'cache = %r' % pool._cache
2
votes

I think this is similar to the question I posted, but I'm not sure you have the same delay. My problem was that I was producing results from the multiprocessing pool faster than I was consuming them, so they built up in memory. To avoid that, I used a semaphore to throttle the inputs into the pool so they didn't get too far ahead of the outputs I was consuming.