77
votes

I'm having troubles with the multiprocessing module. I'm using a Pool of workers with its map method to load data from lots of files and for each of them I analyze data with with a custom function. Each time a file has been processed I would like to have a counter updated so that I can keep track of how many files remains to be processed. Here is sample code:

def analyze_data( args ):
    # do something 
    counter += 1
    print counter


if __name__ == '__main__':

    list_of_files = os.listdir(some_directory)

    global counter
    counter = 0

    p = Pool()
    p.map(analyze_data, list_of_files)

I can't find a solution for this.

5

5 Answers

80
votes

The problem is that the counter variable is not shared between your processes: each separate process is creating it's own local instance and incrementing that.

See this section of the documentation for some techniques you can employ to share state between your processes. In your case you might want to share a Value instance between your workers

Here's a working version of your example (with some dummy input data). Note it uses global values which I would really try to avoid in practice:

from multiprocessing import Pool, Value
from time import sleep

counter = None

def init(args):
    ''' store the counter for later use '''
    global counter
    counter = args

def analyze_data(args):
    ''' increment the global counter, do something with the input '''
    global counter
    # += operation is not atomic, so we need to get a lock:
    with counter.get_lock():
        counter.value += 1
    print counter.value
    return args * 10

if __name__ == '__main__':
    #inputs = os.listdir(some_directory)

    #
    # initialize a cross-process counter and the input lists
    #
    counter = Value('i', 0)
    inputs = [1, 2, 3, 4]

    #
    # create the pool of workers, ensuring each one receives the counter 
    # as it starts. 
    #
    p = Pool(initializer = init, initargs = (counter, ))
    i = p.map_async(analyze_data, inputs, chunksize = 1)
    i.wait()
    print i.get()
41
votes

Counter class without the race-condition bug:

class Counter(object):
    def __init__(self):
        self.val = multiprocessing.Value('i', 0)

    def increment(self, n=1):
        with self.val.get_lock():
            self.val.value += n

    @property
    def value(self):
        return self.val.value
10
votes

A extremly simple example, changed from jkp's answer:

from multiprocessing import Pool, Value
from time import sleep

counter = Value('i', 0)
def f(x):
    global counter
    with counter.get_lock():
        counter.value += 1
    print("counter.value:", counter.value)
    sleep(1)
    return x

with Pool(4) as p:
    r = p.map(f, range(1000*1000))
4
votes

Faster Counter class without using the built-in lock of Value twice

class Counter(object):
    def __init__(self, initval=0):
        self.val = multiprocessing.RawValue('i', initval)
        self.lock = multiprocessing.Lock()

    def increment(self):
        with self.lock:
            self.val.value += 1

    @property
    def value(self):
        return self.val.value

https://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing https://docs.python.org/2/library/multiprocessing.html#multiprocessing.sharedctypes.Value https://docs.python.org/2/library/multiprocessing.html#multiprocessing.sharedctypes.RawValue

0
votes

I'm working on a process bar in PyQT5, so I use thread and pool together

import threading
import multiprocessing as mp
from queue import Queue

def multi(x):
    return x*x

def pooler(q):
    with mp.Pool() as pool:
    count = 0
    for i in pool.imap_unordered(ggg, range(100)):
        print(count, i)
        count += 1
        q.put(count)

def main():
    q = Queue()
    t = threading.Thread(target=thr, args=(q,))
    t.start()
    print('start')
    process = 0
    while process < 100:
        process = q.get()
        print('p',process)
if __name__ == '__main__':
    main()

this I put in Qthread worker and it works with acceptable latency