157
votes

For C++, we can use OpenMP to do parallel programming; however, OpenMP will not work for Python. What should I do if I want to parallel some parts of my python program?

The structure of the code may be considered as:

solve1(A)
solve2(B)

Where solve1 and solve2 are two independent function. How to run this kind of code in parallel instead of in sequence in order to reduce the running time? The code is:

def solve(Q, G, n):
    i = 0
    tol = 10 ** -4

    while i < 1000:
        inneropt, partition, x = setinner(Q, G, n)
        outeropt = setouter(Q, G, n)

        if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol:
            break
            
        node1 = partition[0]
        node2 = partition[1]
    
        G = updateGraph(G, node1, node2)

        if i == 999:
            print "Maximum iteration reaches"
    print inneropt

Where setinner and setouter are two independent functions. That's where I want to parallel...

6
Take a look at multiprocessing. Note: Python's threads are not suitable for CPU-bound tasks, only for I/O-bound.9000
@9000 +100 internets for mentioning the CPU vs I/O dependent tasks.Hyperboreus
@9000 Actually threads are not suitable at all for CPU-bound task as far as I know! Processes is the way to go when doing real CPU-bound tasks.Omar Al-Ithawi
@OmarIthawi: why, threads work fine if you have many CPU cores (as usual now). Then your process can run several threads loading all these cores in parallel and sharing common data between them implicitly (that is, without having an explicit shared memory area or inter-process messaging).9000
@user2134774: Well, yes, my second comment makes little sense. Probably the only C extensions that release the GIL can benefit from that; e.g. parts of NumPy and Pandas do that. On other cases, it is wrong (but I cannot edit it now).9000

6 Answers

183
votes

You can use the multiprocessing module. For this case I might use a processing pool:

from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A])    # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B])    # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)

This will spawn processes that can do generic work for you. Since we did not pass processes, it will spawn one process for each CPU core on your machine. Each CPU core can execute one process simultaneously.

If you want to map a list to a single function you would do this:

args = [A, B]
results = pool.map(solve1, args)

Don't use threads because the GIL locks any operations on python objects.

36
votes

This can be done very elegantly with Ray.

To parallelize your example, you'd need to define your functions with the @ray.remote decorator, and then invoke them with .remote.

import ray

ray.init()

# Define the functions.

@ray.remote
def solve1(a):
    return 1

@ray.remote
def solve2(b):
    return 2

# Start two tasks in the background.
x_id = solve1.remote(0)
y_id = solve2.remote(1)

# Block until the tasks are done and get the results.
x, y = ray.get([x_id, y_id])

There are a number of advantages of this over the multiprocessing module.

  1. The same code will run on a multicore machine as well as a cluster of machines.
  2. Processes share data efficiently through shared memory and zero-copy serialization.
  3. Error messages are propagated nicely.
  4. These function calls can be composed together, e.g.,

    @ray.remote
    def f(x):
        return x + 1
    
    x_id = f.remote(1)
    y_id = f.remote(x_id)
    z_id = f.remote(y_id)
    ray.get(z_id)  # returns 4
    
  5. In addition to invoking functions remotely, classes can be instantiated remotely as actors.

Note that Ray is a framework I've been helping develop.

5
votes

CPython uses the Global Interpreter Lock which makes parallel programing a bit more interesting than C++

This topic has several useful examples and descriptions of the challenge:

Python Global Interpreter Lock (GIL) workaround on multi-core systems using taskset on Linux?

5
votes

The solution, as others have said, is to use multiple processes. Which framework is more appropriate, however, depends on many factors. In addition to the ones already mentioned, there is also charm4py and mpi4py (I am the developer of charm4py).

There is a more efficient way to implement the above example than using the worker pool abstraction. The main loop sends the same parameters (including the complete graph G) over and over to workers in each of the 1000 iterations. Since at least one worker will reside on a different process, this involves copying and sending the arguments to the other process(es). This could be very costly depending on the size of the objects. Instead, it makes sense to have workers store state and simply send the updated information.

For example, in charm4py this can be done like this:

class Worker(Chare):

    def __init__(self, Q, G, n):
        self.G = G
        ...

    def setinner(self, node1, node2):
        self.updateGraph(node1, node2)
        ...


def solve(Q, G, n):
    # create 2 workers, each on a different process, passing the initial state
    worker_a = Chare(Worker, onPE=0, args=[Q, G, n])
    worker_b = Chare(Worker, onPE=1, args=[Q, G, n])
    while i < 1000:
        result_a = worker_a.setinner(node1, node2, ret=True)  # execute setinner on worker A
        result_b = worker_b.setouter(node1, node2, ret=True)  # execute setouter on worker B

        inneropt, partition, x = result_a.get()  # wait for result from worker A
        outeropt = result_b.get()  # wait for result from worker B
        ...

Note that for this example we really only need one worker. The main loop could execute one of the functions, and have the worker execute the other. But my code helps to illustrate a couple of things:

  1. Worker A runs in process 0 (same as the main loop). While result_a.get() is blocked waiting on the result, worker A does the computation in the same process.
  2. Arguments are automatically passed by reference to worker A, since it is in the same process (there is no copying involved).
3
votes

In some cases, it's possible to automatically parallelize loops using Numba, though it only works with a small subset of Python:

from numba import njit, prange

@njit(parallel=True)
def prange_test(A):
    s = 0
    # Without "parallel=True" in the jit-decorator
    # the prange statement is equivalent to range
    for i in prange(A.shape[0]):
        s += A[i]
    return s

Unfortunately, it seems that Numba only works with Numpy arrays, but not with other Python objects. In theory, it might also be possible to compile Python to C++ and then automatically parallelize it using the Intel C++ compiler, though I haven't tried this yet.

2
votes

You can use joblib library to do parallel computation and multiprocessing.

from joblib import Parallel, delayed

You can simply create a function foo which you want to be run in parallel and based on the following piece of code implement parallel processing:

output = Parallel(n_jobs=num_cores)(delayed(foo)(i) for i in input)

Where num_cores can be obtained from multiprocessing library as followed:

import multiprocessing

num_cores = multiprocessing.cpu_count()

If you have a function with more than one input argument, and you just want to iterate over one of the arguments by a list, you can use the the partial function from functools library as follow:

from joblib import Parallel, delayed
import multiprocessing
from functools import partial
def foo(arg1, arg2, arg3, arg4):
    '''
    body of the function
    '''
    return output
input = [11,32,44,55,23,0,100,...] # arbitrary list
num_cores = multiprocessing.cpu_count()
foo_ = partial(foo, arg2=arg2, arg3=arg3, arg4=arg4)
# arg1 is being fetched from input list
output = Parallel(n_jobs=num_cores)(delayed(foo_)(i) for i in input)

You can find a complete explanation of the python and R multiprocessing with couple of examples here.