4
votes

I am trying to implement an online recursive parallel algorithm, which is highly parallelizable. My problem is that my python implementation does not work as I want. I have two 2D matrices where I want to update recursively every column every time a new observation is observed at time-step t. My parallel code is like this

def apply_async(t):
    worker =  mp.Pool(processes = 4)
    for i in range(4):
        X[:,i,np.newaxis], b[:,i,np.newaxis] =  worker.apply_async(OULtraining, args=(train[t,i], X[:,i,np.newaxis], b[:,i,np.newaxis])).get()


    worker.close()
    worker.join()      




for t in range(p,T):
    count = 0 
    for l in range(p):
        for k in range(4):
            gn[count]=train[t-l-1,k]
            count+=1
    G = G*v +  gn @ gn.T
    Gt = (1/(t-p+1))*G

    if __name__ == '__main__':
        apply_async(t)

The two matrices are X and b. I want to replace directly on master's memory as each process updates recursively only one specific column of the matrices.

Why this implementation is slower than the sequential?

Is there any way to resume the process every time-step rather than killing them and create them again? Could this be the reason it is slower?

1
Are any of the arguments to worker.apply_async() perhaps large data structures? - Hannu
What does it mean large? X and b are large 2D matrices, but as an argument I pass only a column of them. Then I do not think so - Bekromoularo
Ok. I think I found your problem, see my answer below. You need to adapt the solution to your code and data structures but the examples should clarify where this goes wrong. - Hannu
Can you pelase fix the indentation? There are several errors there. Can you please make an example of the matrices you want to modify? Also, apply_async is already a method of Pool, cehck it up. Your function is recursively calling itself and generating a Pool everytime. This doesn't look right at all. - alec_djinn
What I want to implement is recursive as I said. And from my last question it is clear that I am aware of this that is why I ask "Is there any way to resume the process every time-step rather than killing them and create them again?". Because I want to stop it from recursively calling itself. Is there anyway so I can call the same processes to update the same column of the matrix and not kill it and create it again and again? - Bekromoularo

1 Answers

3
votes

The reason is, your program is in practice sequential. This is an example code snippet that is from parallelism standpoint identical to yours:

from multiprocessing import Pool
from time import sleep

def gwork( qq):
    print (qq)
    sleep(1)
    return 42

p = Pool(processes=4)

for q in range(1, 10):
    p.apply_async(gwork, args=(q,)).get()
p.close()
p.join()

Run this and you shall notice numbers 1-9 appearing exactly once in a second. Why is this? The reason is your .get(). This means every call to apply_async will in practice block in get() until a result is available. It will submit one task, wait a second emulating processing delay, then return the result, after which another task is submitted to your pool. This means there is no parallel execution ongoing at all.

Try replacing the pool management part with this:

results = []
for q in range(1, 10):
    res = p.apply_async(gwork, args=(q,))
    results.append(res)
p.close()
p.join()
for r in results:
    print (r.get())

You can now see parallelism at work, as four of your tasks are now processed simultaneously. Your loop does not block in get, as get is moved out of the loop and results are received only when they are ready.

NB: If your arguments to your worker or the return values from them are large data structures, you will lose some performance. In practice Python implements these as queues, and transmitting a lot of data via a queue is slow on relative terms compared to getting an in-memory copy of a data structure when a subprocess is forked.