2
votes

I have a two dimensional function and I want to compute the elements of the function on the grid points but the two loops over rows and columns are very slow and I want to use multiprocessing to increase the speed of the code. I have written the following code to do two loops:

from multiprocessing import Pool

#Grid points
ra = np.linspace(25.1446, 25.7329, 1000)
dec = np.linspace(-10.477, -9.889, 1000)
#The 2D function
def like2d(x,y): 
    stuff=[RaDec, beta, rho_c_over_sigma_c, zhalo, rho_crit]
    m=3e14
    c=7.455
    param=[x, y, m, c]
    return reduced_shear( param, stuff, observed_g, g_err)

pool = Pool(processes=12)

def data_stream(a, b):
    for i, av in enumerate(a):
        for j, bv in enumerate(b):
            yield (i, j), (av, bv)

def myfunc(args):
    return args[0], like2d(*args[1])

counter,likelihood = pool.map(myfunc, data_stream(ra, dec))

But I got the following error message:

Process PoolWorker-1:

Traceback (most recent call last):
  File "/user/anaconda/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/user/anaconda/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/user/anaconda/lib/python2.7/multiprocessing/pool.py", line 102, in worker
    task = get()
  File "/user/anaconda/lib/python2.7/multiprocessing/queues.py", line 376, in get
    return recv()
AttributeError: 'module' object has no attribute 'myfunc'
Process PoolWorker-2:
Traceback (most recent call last):
  File "/user/anaconda/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/user/anaconda/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/user/anaconda/lib/python2.7/multiprocessing/pool.py", line 102, in worker
    task = get()
  File "/user/anaconda/lib/python2.7/multiprocessing/queues.py", line 376, in get
    return recv()
AttributeError: 'module' object has no attribute 'myfunc'
Process PoolWorker-3:
Traceback (most recent call last):
  File "/user/anaconda/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/user/anaconda/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/user/anaconda/lib/python2.7/multiprocessing/pool.py", line 102, in worker
    task = get()
  File "/user/anaconda/lib/python2.7/multiprocessing/queues.py", line 376, in get
    return recv()
AttributeError: 'module' object has no attribute 'myfunc'
Process PoolWorker-4:

Everything is defined and I do not understand why this error message raised!! Can anybody point out what might be wrong?

Another approach to do loops with multiprocessing and save the results in a 2d array:

#Grid points
ra = np.linspace(25.1446, 25.7329, 1000)
dec = np.linspace(-10.477, -9.889, 1000)

#The 2D function
def like2d(x,y):
    stuff=[RaDec, beta, rho_c_over_sigma_c, zhalo, rho_crit]
    m=3e14
    c=7.455
    param=[x, y, m, c]
    return reduced_shear( param, stuff, observed_g, g_err)


shared_array_base = multiprocessing.Array(ctypes.c_double, ra.shape[0]*dec.shape[0])
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
shared_array = shared_array.reshape( ra.shape[0],dec.shape[0])

# Parallel processing
def my_func(i, def_param=shared_array):
    shared_array[i,:] = np.array([float(like2d(ra[j],dec[i])) for j in range(ra.shape[0])])

print "processing to estimate likelihood in 2D grids......!!!"
start = time.time()
pool = multiprocessing.Pool(processes=12)
pool.map(my_func, range(dec.shape[0]))
print shared_array
end = time.time()
print end - start
1
As an aside, if you have less than 12 CPU cores on your machine, you're probably only going to hurt performance by creating 12 processes to do CPU-bound work. Your CPUs can only actually have num_cpus processes at once, so the rest are just wasting memory and causing the OS to have to context switch excessively. If you do have 12 or more cores, lucky you!dano
@dano I have access to a cluster and I can fork even more!Dalek

1 Answers

4
votes

You have to create the Pool after the worker function (myfunc) definition. Creating the Pool causes Python to fork your worker processes right at that point, and the only things that will be defined in the children are the functions defined above the Pool definition. Also, map will return a list of tuples (one for each object yielded by data_stream), not a single tuple. So you need this:

from multiprocessing import Pool

#Grid points
ra = np.linspace(25.1446, 25.7329, 1000)
dec = np.linspace(-10.477, -9.889, 1000)
#The 2D function
def like2d(x,y): 
    stuff=[RaDec, beta, rho_c_over_sigma_c, zhalo, rho_crit]
    m=3e14
    c=7.455
    param=[x, y, m, c]
    return reduced_shear( param, stuff, observed_g, g_err)


def data_stream(a, b):
    for i, av in enumerate(a):
        for j, bv in enumerate(b):
            yield (i, j), (av, bv)

def myfunc(args):
    return args[0], like2d(*args[1])

if __name__ == "__main__":    
    pool = Pool(processes=12)
    results = pool.map(myfunc, data_stream(ra, dec))  # results is a list of tuples.
    for counter,likelihood in results:
        print("counter: {}, likelihood: {}".format(counter, likelihood))

I added the if __name__ == "__main__": guard, which isn't necessary on POSIX platforms, but would be necessary on Windows (which doesn't support os.fork()).