4
votes

Cython starter here. I am trying to speed up a calculation of a certain pairwise statistic (in several bins) by using multiple threads. In particular, I am using prange from cython.parallel, which internally uses openMP.

The following minimal example illustrates the problem (compilation via Jupyter notebook Cython magic).

Notebook setup:

%load_ext Cython
import numpy as np

Cython code:

%%cython --compile-args=-fopenmp --link-args=-fopenmp -a

from cython cimport boundscheck
import numpy as np
from cython.parallel cimport prange, parallel

@boundscheck(False)
def my_parallel_statistic(double[:] X, double[:,::1] bins, int num_threads):

    cdef: 
        int N = X.shape[0]
        int nbins = bins.shape[0]
        double Xij,Yij
        double[:] Z = np.zeros(nbins,dtype=np.float64)
        int i,j,b

    with nogil, parallel(num_threads=num_threads):
        for i in prange(N,schedule='static',chunksize=1):
            for j in range(i):
                #some pairwise quantities
                Xij = X[i]-X[j]
                Yij = 0.5*(X[i]+X[j])
                #check if in bin
                for b in range(nbins):
                    if (Xij < bins[b,0]) or (Xij > bins[b,1]):
                        continue
                    Z[b] += Xij*Yij

    return np.asarray(Z)

mock data and bins

X = np.random.rand(10000)
bin_edges = np.linspace(0.,1,11)
bins = np.array([bin_edges[:-1],bin_edges[1:]]).T
bins = bins.copy(order='C')

Timing via

%timeit my_parallel_statistic(X,bins,1)
%timeit my_parallel_statistic(X,bins,4)

yields

1 loop, best of 3: 728 ms per loop
1 loop, best of 3: 330 ms per loop

which is not a perfect scaling, but that is not the main point of the question. (But do let me know if you have suggestions beyond adding the usual decorators or fine-tuning the prange arguments.)

However, this calculation is apparently not thread-safe:

Z1 = my_parallel_statistic(X,bins,1)
Z4 = my_parallel_statistic(X,bins,4)
np.allclose(Z1,Z4)

reveals a significant difference between the two results (up to 20% in this example).

I strongly suspect that the problem is that multiple threads can do

Z[b] += Xij*Yij

at the same time. But what I don't know is how to fix this without sacrificing the speed-up.

In my actual use case, the calculation of Xij and Yij is more expensive, hence I would like to do them only once per pair. Also, pre-computing and storing Xij and Yij for all pairs and then simply looping through bins is not a good option either because N can get very large, and I can't store 100,000 x 100,000 numpy arrays in memory (this was actually the main motivation for rewriting it in Cython!).

System info (added following suggestion in comments):

CPU(s): 8
Model name: Intel(R) Core(TM) i7-4790K CPU @ 4.00GHz
OS: Red Hat Linux v6.8
Memory: 16 GB
2
Is the action in each thread truly independent of the action of any other? Does it matter which one runs first? If there is any kind of dependency this isn't a good candidate for parallel operations. - hpaulj
As long as each thread is creating its own Xij and Yij, they should be independent (but maybe this is the problem?) As far as the math is concerned, the Xij and Yij are computed independently for each pair (i,j), and therefore the contribution to the statistic Z as well. - user4319496
Thank you for including such a excellent minimal reproducible example in your question! Such a well researched and formulated question is too rare on SO. The only thing that you might have included is your CPU model and memory to comment on the performformance, but that's not a main point of the question anyway. - Zulan
Good point, have added it - user4319496

2 Answers

5
votes

Yes, Z[b] += Xij*Yij is indeed a race condition.

There are a couple of options of making this atomic or critical. Implementation issues with Cython aside, you would in any case have bad performance due to false sharing on the shared Z vector.

So the better alternative is to reserve a private array for each thread. There are a couple of (non-)options again. One could use a private malloc'd pointer, but I wanted to stick with np. Memory slices cannot be assigned as private variables. A two dimensional (num_threads, nbins) array works, but for some reason generates very complicated inefficient array index code. This works but is slower and does not scale.

A flat numpy array with manual "2D" indexing works well. You get a little bit extra performance by avoiding padding the private parts of the array to 64 byte, which is a typical cache line size. This avoids false sharing between the cores. The private parts are simply summed up serially outside of the parallel region.

%%cython --compile-args=-fopenmp --link-args=-fopenmp -a
from cython cimport boundscheck
import numpy as np
from cython.parallel cimport prange, parallel
cimport openmp

@boundscheck(False)
def my_parallel_statistic(double[:] X, double[:,::1] bins, int num_threads):

    cdef: 
        int N = X.shape[0]
        int nbins = bins.shape[0]
        double Xij,Yij
        # pad local data to 64 byte avoid false sharing of cache-lines
        int nbins_padded = (((nbins - 1) // 8) + 1) * 8
        double[:] Z_local = np.zeros(nbins_padded * num_threads,dtype=np.float64)
        double[:] Z = np.zeros(nbins)
        int i,j,b, bb, tid

    with nogil, parallel(num_threads=num_threads):
        tid = openmp.omp_get_thread_num()
        for i in prange(N,schedule='static',chunksize=1):
            for j in range(i):
                #some pairwise quantities
                Xij = X[i]-X[j]
                Yij = 0.5*(X[i]+X[j])
                #check if in bin
                for b in range(nbins):
                    if (Xij < bins[b,0]) or (Xij > bins[b,1]):
                        continue
                    Z_local[tid * nbins_padded + b] += Xij*Yij
    for tid in range(num_threads):
        for bb in range(nbins):
            Z[bb] += Z_local[tid * nbins_padded + bb]


    return np.asarray(Z)

This performs quite well on my 4 core machine, with 720 ms / 191 ms, a speedup of 3.6. The remaining gap may be due to turbo mode. I don't have access to a proper machine for testing right now.

1
votes

You are right that the access to Z is under a race condition.

You might be better off defining num_threads copies of Z, as cdef double[:] Z = np.zeros((num_threads, nbins), dtype=np.float64) and perform a sum along axis 0 after the prange loop.

return np.sum(Z, axis=0)

Cython code can have a with gil statement in a parallel region but it is only documented for error handling. You could have a look at the general C code to see whether that would trigger an atomic OpenMP operation but I doubt it.