0
votes

The logic in the code is to pull data via (async) HTTP request and then build a large list of dictionaries where one of the values is randomly generated:

import asyncio
import random
import string
import time

from concurrent.futures import ProcessPoolExecutor
from itertools import cycle

from httpx import AsyncClient

URL = 'http://localhost:8080'
COUNT = 1_000_000


def rand_str(length=10):
    return ''.join(random.choice(string.ascii_uppercase) for i in range(length))


def parser(data, count):
    items = []

    for _, item in zip(range(count), cycle(data)):
        item['instance'] = rand_str()
        items.append(item)

    return items


async def parser_coro(data, count):
    items = []

    for _, item in zip(range(count), cycle(data)):
        item['instance'] = rand_str()
        items.append(item)

    return items


async def run_in_executor(func, pool, *args, **kwargs):
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(pool, func, *args, **kwargs)


async def main():
    async with AsyncClient(base_url=URL) as client:
        r = await client.get('/api/alerts/')
        data = r.json()

    # Case 1
    t1 = time.perf_counter()
    parser(data, COUNT)
    t2 = time.perf_counter()
    print(f'Case 1 - sync: {t2 - t1:.3f}s')
    
    # Case 2
    t1 = time.perf_counter()
    await parser_coro(data, COUNT)
    t2 = time.perf_counter()
    print(f'Case 2 - coro (no await): {t2 - t1:.3f}s')

    # Case 3
    t1 = time.perf_counter()
    await run_in_executor(parser, None, data, COUNT)
    t2 = time.perf_counter()
    print(f'Case 3 - thread executor: {t2 - t1:.3f}s')

    # Case 4
    t1 = time.perf_counter()
    with ProcessPoolExecutor() as executor:
        await run_in_executor(parser, executor, data, COUNT)
    t2 = time.perf_counter()
    print(f'Case 4 - process executor: {t2 - t1:.3f}s')


if __name__ == '__main__':
    asyncio.run(main(), debug=True)

Test:

$ python test.py 
Case 1 - sync: 6.593s
Case 2 - coro (no await): 6.565s
Executing <Task pending name='Task-1' coro=<main() running at test.py:63> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /root/.pyenv/versions/3.8.1/lib/python3.8/asyncio/futures.py:360, <TaskWakeupMethWrapper object at 0x7efff962a1f0>()] created at /root/.pyenv/versions/3.8.1/lib/python3.8/asyncio/base_events.py:422> cb=[_run_until_complete_cb() at /root/.pyenv/versions/3.8.1/lib/python3.8/asyncio/base_events.py:184] created at /root/.pyenv/versions/3.8.1/lib/python3.8/asyncio/base_events.py:591> took 13.176 seconds
Case 3 - thread executor: 6.675s
Case 4 - process executor: 6.726s

Question:

Should I run the parser function in the executor so it's not blocking the main thread while the list is generated or it won't help in this case? Is this actually a CPU or I/O bound workload in this case? I guess there is no any IO but is building a list a CPU intensive task so the workload is CPU bound?

1
The anwer is yes, you should run it in the executor, and yes, it's a CPU-bound function. If you don't run it in the executor, no other coroutine will be able to run while parser() is executing. It doesn't make a difference now that you only have one coroutine running, but it will make a difference when you actually try to run things in parallel. If you can get a way with ProcessPoolExecutor, that's even better, because it will allow you to make use of multiple cores. - user4815162342
@user4815162342 thanks for the comment. Can I run this in the ThreadPoolExecutor so it's not blocking or it must be a ProcessPoolExecutor as it's a CPU-bound function? - HTF
You can run it in ThreadPoolExecutor, sure. It's just that if you have a bunch of those running in parallel, they will all share the same CPU core. (But they won't block other coroutines because they will run off the event-loop thread.) - user4815162342

1 Answers

1
votes

Should I run the parser function in the executor so it's not blocking the main thread while the list is generated or it won't help in this case?

Yes, you should. Despite the global interpreter lock, using a separate thread will help because Python will allow execution to switch from parsing to the asyncio thread without the parser being aware of it. Using a thread will thus prevent the event loop from being blocked for 6 seconds, or however long it takes to run the function.

Note that the parser_coro variant is no different than the parser variant without executor because it doesn't await anything. await parser_coro(...) will halt the vent loop just like an executor-less call to parser(...).

Is this actually a CPU or I/O bound workload in this case?

I can't comment the rest of the workload, but the function as written is definitely CPU-bound.

Can I run this in the ThreadPoolExecutor so it's not blocking or it must be a ProcessPoolExecutor as it's a CPU-bound function?

You can run it in ThreadPoolExecutor, sure. It's just that if you have a bunch of those running in parallel, they will all share the same CPU core. (But they won't block other coroutines because they will run off the event-loop thread.)