The issue in your example is that modifications to standard mutable structures within Manager.dict
will not be propagated. I'm first showing you how to fix it with manager, just to show you better options afterwards.
multiprocessing.Manager
is a bit heavy since it uses a separate Process just for the Manager
and working on a shared object needs using locks for data consistency. If you run this on one machine, there are better options with multiprocessing.Pool
, in case you don't have to run customized Process
classes and if you have to, multiprocessing.Process
together with multiprocessing.Queue
would be the common way of doing it.
The quoting parts are from the multiprocessing docs.
Manager
If standard (non-proxy) list or dict objects are contained in a referent, modifications to those mutable values will not be propagated through the manager because the proxy has no way of knowing when the values contained within are modified. However, storing a value in a container proxy (which triggers a setitem on the proxy object) does propagate through the manager and so to effectively modify such an item, one could re-assign the modified value to the container proxy...
In your case this would look like:
def worker(xrange, return_dict, lock):
"""worker function"""
for x in range(xrange[0], xrange[1]):
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
with lock:
list_x = return_dict['x']
list_y = return_dict['y']
list_x.append(x)
list_y.append(y)
return_dict['x'] = list_x
return_dict['y'] = list_y
The lock
here would be a manager.Lock
instance you have to pass along as argument since the whole (now) locked operation is not by itself atomic. (Here
is an easier example with Manager
using Lock)
This approach is perhaps less convenient than employing nested Proxy Objects for most use cases but also demonstrates a level of control over the synchronization.
Since Python 3.6 proxy objects are nestable:
Changed in version 3.6: Shared objects are capable of being nested. For example, a shared container object such as a shared list can contain other shared objects which will all be managed and synchronized by the SyncManager.
Since Python 3.6 you can fill your manager.dict
before starting multiprocessing with manager.list
as values and then append directly in the worker without having to reassign.
return_dict['x'] = manager.list()
return_dict['y'] = manager.list()
EDIT:
Here is the full example with Manager
:
import time
import multiprocessing as mp
from multiprocessing import Manager, Process
from contextlib import contextmanager
# mp_util.py from first link in code-snippet for "Pool"
# section below
from mp_utils import calc_batch_sizes, build_batch_ranges
# def context_timer ... see code snippet in "Pool" section below
def worker(batch_range, return_dict, lock):
"""worker function"""
for x in batch_range:
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
with lock:
return_dict['x'].append(x)
return_dict['y'].append(y)
if __name__ == '__main__':
N_WORKERS = mp.cpu_count()
X_MAX = 100000000
batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
batch_ranges = build_batch_ranges(batch_sizes)
print(batch_ranges)
with Manager() as manager:
lock = manager.Lock()
return_dict = manager.dict()
return_dict['x'] = manager.list()
return_dict['y'] = manager.list()
tasks = [(batch_range, return_dict, lock)
for batch_range in batch_ranges]
with context_timer():
pool = [Process(target=worker, args=args)
for args in tasks]
for p in pool:
p.start()
for p in pool:
p.join()
# Create standard container with data from manager before exiting
# the manager.
result = {k: list(v) for k, v in return_dict.items()}
print(result)
Pool
Most often a multiprocessing.Pool
will just do it. You have an additional challenge in your example since you want to distribute iteration over a range.
Your chunker
function doesn't manage to divide the range even so every process has about the same work to do:
chunker((0, 21), 4)
# Out: [[0, 4], [5, 9], [10, 14], [15, 21]] # 4, 4, 4, 6!
For the code below please grab the code snippet for mp_utils.py
from my answer here, it provides two functions to chunk ranges as even as possible.
With multiprocessing.Pool
your worker
function just has to return the result and Pool
will take care of transporting the result back over internal queues back to the parent process. The result
will be a list, so you will have to rearange your result again in a way you want it to have. Your example could then look like this:
import time
import multiprocessing as mp
from multiprocessing import Pool
from contextlib import contextmanager
from itertools import chain
from mp_utils import calc_batch_sizes, build_batch_ranges
@contextmanager
def context_timer():
start_time = time.perf_counter()
yield
end_time = time.perf_counter()
total_time = end_time-start_time
print(f'\nEach iteration took: {total_time / X_MAX:.4f} s')
print(f'Total time: {total_time:.4f} s\n')
def worker(batch_range):
"""worker function"""
result = []
for x in batch_range:
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
result.append((x, y))
return result
if __name__ == '__main__':
N_WORKERS = mp.cpu_count()
X_MAX = 100000000
batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
batch_ranges = build_batch_ranges(batch_sizes)
print(batch_ranges)
with context_timer():
with Pool(N_WORKERS) as pool:
results = pool.map(worker, iterable=batch_ranges)
print(f'results: {results}')
x, y = zip(*chain.from_iterable(results)) # filter and sort results
print(f'results sorted: x: {x}, y: {y}')
Example Output:
[range(0, 12500000), range(12500000, 25000000), range(25000000, 37500000),
range(37500000, 50000000), range(50000000, 62500000), range(62500000, 75000000), range(75000000, 87500000), range(87500000, 100000000)]
Condition met at: -15 0
Condition met at: -3 1
Condition met at: 11 2
Each iteration took: 0.0000 s
Total time: 8.2408 s
results: [[(0, -15), (1, -3), (2, 11)], [], [], [], [], [], [], []]
results sorted: x: (0, 1, 2), y: (-15, -3, 11)
Process finished with exit code 0
If you had multiple arguments for your worker
you would build a "tasks"-list with argument-tuples and exchange pool.map(...)
with pool.starmap(...iterable=tasks)
. See docs for further details on that.
Process & Queue
If you can't use multiprocessing.Pool
for some reason, you have to take
care of inter-process communication (IPC) yourself, by passing a
multiprocessing.Queue
as argument to your worker-functions in the child-
processes and letting them enqueue their results to be send back to the
parent.
You will also have to build your Pool-like structure so you can iterate over it to start and join the processes and you have to get()
the results back from the queue. More about Queue.get
usage I've written up here.
A solution with this approach could look like this:
def worker(result_queue, batch_range):
"""worker function"""
result = []
for x in batch_range:
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
result.append((x, y))
result_queue.put(result) # <--
if __name__ == '__main__':
N_WORKERS = mp.cpu_count()
X_MAX = 100000000
result_queue = mp.Queue() # <--
batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
batch_ranges = build_batch_ranges(batch_sizes)
print(batch_ranges)
with context_timer():
pool = [Process(target=worker, args=(result_queue, batch_range))
for batch_range in batch_ranges]
for p in pool:
p.start()
results = [result_queue.get() for _ in batch_ranges]
for p in pool:
p.join()
print(f'results: {results}')
x, y = zip(*chain.from_iterable(results)) # filter and sort results
print(f'results sorted: x: {x}, y: {y}')