2
votes

I'm trying to implement asynchronous multiprocessing programming with callbacks in python. I read materials about multiprocessing module, process/thread pool and asyncio module but could not get to point. I have a corutine and a callback function like

async def func(x):
   value = await (other coroutine)
   return x 

def callback(x):
   print(x)

and I want to submit func(1), func(2), func(3), ... , func(100) to a pool of worker processes of given number, say, 5 processes. Then I want to makes those workers to invoke the callback whenever any of the return values arrives the parent process, not after every return value arrives the parent process.

map/map_async/starmap/starmap_async methods in multiprocessing.Pool waits all of the returns from the worker processes arrives the parent process. I studied basic elements of asynchronous programming in asyncio module in python, but it still remains elusive for me to accomplish the above task - asynchronous multiprocessing programming with callbacks

Could anybody give me a lucid example code for this task?

Thanks in advance.

1

1 Answers

-2
votes

I think you want this in a hacky way:

import asyncio
import random
from concurrent.futures import ProcessPoolExecutor


async def do_sth(future, n):
    await asyncio.sleep(random.randint(1, 5))
    future.set_result('{} is done!'.format(n))


def got_result(fut):
    print(fut.result())

if __name__ == '__main__':
    executor = ProcessPoolExecutor(5)

    task = []
    loop = asyncio.get_event_loop()
    for n in range(1, 100):
        future = asyncio.Future()
        task.append(
            asyncio.ensure_future(do_sth(future, n))
        )
        future.add_done_callback(got_result)
    try:
        loop.run_until_complete(asyncio.gather(*task))
    finally:
        loop.close()

But I suggust you to think about what you actually gain from combine multi-process and asyncio callback, I think what you actually want maybe queue and multi-process.