15
votes

Is there any way to control the scheduling priority among all coroutines that are ready to run?

Specifically, I have several coroutines handling streaming I/O from the network into several queues, a second set of coroutines that ingest the data from the queues into a data structure. These ingestion coroutines signal a third set of coroutines that analyze that data structure whenever new data is ingested.

Data arrival from the network is an infinite stream with a non-deterministic message rate. I want the analysis step to run as soon as new data arrives, but not before all pending data is processed. The problem I see is that depending on the order of scheduling, an analysis coroutine could run before a reader coroutine that also had data ready, so the analysis coroutine can't even check the ingestion queues for pending data because it may not have been read off the network yet, even though those reader coroutines were ready to run.

One solution might be to structure the coroutines into priority groups so that the reader coroutines would always be scheduled before the analysis coroutines if they were both able to run, but I didn't see a way to do this.

Is there a feature of asyncio that can accomplish this prioritization? Or perhaps I'm asking the wrong question and I can restructure the coroutines such that this can't happen (but I don't see it).

-- edit --

Basically I have a N coroutines that look something like this:

while True:
  data = await socket.get()
  ingestData(data)
  self.event.notify()

So the problem I'm running into is that there's no way for me to know that any of the other N-1 sockets have data ready while executing this coroutine so I can't know whether or not I should notify the event. If I could prioritize these coroutines above the analysis coroutine (which is awaiting self.event.wait()) then I could be sure none of them were runnable when the analysis coroutine is scheduled.

1
there's no way for me to know that any of the other N-1 sockets have data ready while executing this coroutine Couldn't you have a counter that maintains whether any of the higher-priority coroutines are running, and have the lower-priority coroutine await for the counter to drop to zero? That is essentially what the code in my answer is doing, and I don't see why it wouldn't work. Can you provide a bit more sample code for the analysis and pending data processing coroutines?user4815162342
The problem is that the higher-priority coroutines are not running but they are runnable, by which I mean if multiple sockets receive data simultaneously, then await socket.get() will execute for one until it's complete and then asyncio will look for the next task to start, but it has two to choose from (socket.get() and self.event.wait()). I want to be certain that the next task it chooses to start is always any task waiting on socket.get() and only start the one waiting on self.event.wait() once there are no other runnable tasks.djmarcin
I think I now understand the problem better, thanks. I've updated the answer with a different solution.user4815162342

1 Answers

3
votes

asyncio doesn't support explicitly specifying coroutine priorities, but it is straightforward to achieve the same effect them with the tools provided by the library. Given the example in your question:

async def process_pending():
    while True:
    data = await socket.get()
        ingestData(data)
        self.event.notify()

You could await the sockets directly using asyncio.wait, and then you would know which sockets are actionable, and only notify the analyzers after all have been processed. For example:

def _read_task(self, socket):
    loop = asyncio.get_event_loop()
    task = loop.create_task(socket.get())
    task.__process_socket = socket
    return task

async def process_pending_all(self):
    tasks = {self._read_task(socket) for socket in self.sockets}
    while True:
        done, not_done = await asyncio.wait(
            tasks, return_when=asyncio.FIRST_COMPLETED)
        for task in done:
            ingestData(task.result())
            not_done.add(self._read_task(task.__process_socket))
        tasks = not_done
        self.event.notify()