1
votes

Given a list of tasks that are io and cpu bound. When waiting for them using asyncio.wait with a timeout, the timout is triggered very late. Is it possible to prioritize the pending timeout?

This example defines workers that simulate 1 second cpu and 1 second io bound operations. It runs some of them in parallel and waits for 10 seconds.

async def worker():
    for _ in range(200):
        # approx one second of cpu bound operations
        for _ in range(10000000):
            random.random()

        # one second of io bound operations
        await asyncio.sleep(1)

async def main(num_tasks):
    tasks = []
    for _ in range(num_tasks):
        tasks.append(asyncio.create_task(worker()))

    time_start = time.monotonic()
    done, pending = await asyncio.wait(tasks, timeout=10)
    print(f'with {num_tasks} tasks waited for {time.monotonic() - time_start:.3}s')

    for task in pending:
        task.cancel()

for num_tasks in range(1, 11):
    asyncio.run(main(num_tasks))

With an increasing number of tasks, the timeout is getting less and less accurate.

with 1 tasks waited for 10.0s
with 2 tasks waited for 10.9s
with 3 tasks waited for 11.5s
with 4 tasks waited for 12.0s
with 5 tasks waited for 13.2s
with 6 tasks waited for 14.2s
with 7 tasks waited for 15.6s
with 8 tasks waited for 16.8s
with 9 tasks waited for 17.9s
with 10 tasks waited for 18.6s

It seems to me that the main coroutine, whose timeout should be triggered, is not scheduled immediately when one of the workers suspends itself by asyncio.sleep. Multiple workers are scheduled before the pending timeout is processed. The underlying callback scheduled with loop.call_later seems to be treated like any other task.

Is it possible to tell the event loop to prioritize the main task or any pending callback?

1

1 Answers

1
votes

The event loop doesn't support the kind of priorities that you are after. Asyncio expects all operations carried out inside the event loop coroutines and callbacks to be "quick" - exactly how quick is a matter of interpretation, but they need to be fast enough not to affect the latency of the program.

To support CPU-bound execution, there is run_in_executor, which submits the given operation to a thread pool and suspends the current coroutine until it is done, allowing other coroutines to make progress. In your case the worker would look like this:

def cpu_bound():
    for _ in range(10000000):
        random.random()

async def worker():
    loop = asyncio.get_event_loop()
    for _ in range(200):
        await loop.run_in_executor(None, cpu_bound)
        await asyncio.sleep(1)

Modified like that, your program waits for ~10s regardless of the number of tasks.

In Python 3.9 and later, you can also use asyncio.to_thread.