Is there any way to control the scheduling priority among all coroutines that are ready to run?
Specifically, I have several coroutines handling streaming I/O from the network into several queues, a second set of coroutines that ingest the data from the queues into a data structure. These ingestion coroutines signal a third set of coroutines that analyze that data structure whenever new data is ingested.
Data arrival from the network is an infinite stream with a non-deterministic message rate. I want the analysis step to run as soon as new data arrives, but not before all pending data is processed. The problem I see is that depending on the order of scheduling, an analysis coroutine could run before a reader coroutine that also had data ready, so the analysis coroutine can't even check the ingestion queues for pending data because it may not have been read off the network yet, even though those reader coroutines were ready to run.
One solution might be to structure the coroutines into priority groups so that the reader coroutines would always be scheduled before the analysis coroutines if they were both able to run, but I didn't see a way to do this.
Is there a feature of asyncio that can accomplish this prioritization? Or perhaps I'm asking the wrong question and I can restructure the coroutines such that this can't happen (but I don't see it).
-- edit --
Basically I have a N coroutines that look something like this:
while True:
data = await socket.get()
ingestData(data)
self.event.notify()
So the problem I'm running into is that there's no way for me to know that any of the other N-1 sockets have data ready while executing this coroutine so I can't know whether or not I should notify the event. If I could prioritize these coroutines above the analysis coroutine (which is awaiting self.event.wait()) then I could be sure none of them were runnable when the analysis coroutine is scheduled.
await socket.get()
will execute for one until it's complete and then asyncio will look for the next task to start, but it has two to choose from (socket.get()
andself.event.wait()
). I want to be certain that the next task it chooses to start is always any task waiting onsocket.get()
and only start the one waiting onself.event.wait()
once there are no other runnable tasks. – djmarcin