I'm trying to implement the http/2 stack in my own app server which I've built from scratch using asyncio. As per my understanding, asyncio maintains a "tasks" queue internally which is used by the event loop to run tasks. Now, to implement stream prioritization I need to be able to run the high priority tasks for a longer duration than the low priority tasks (by tasks I'm thinking, the coroutine returned by a call to application(scope, receive, send) as per the ASGI spec.) I'm not able to find a way to prioritize this internal queue used by asyncio.
I even thought about capturing the event dicts I get as an argument to the send callable in application(scope, receive, send) but the asgi spec says "Protocol servers must flush any data passed to them into the send buffer before returning from a send call". What is meant by "send buffer" here? Is it the OS/Kernel send buffer?
Am I thinking about stream prioritization in the wrong sense? What would be a good approach to implement this?
class Worker(object):
def get_asgi_event_dict(self, frame):
event_dict = {
"type": "http",
"asgi": {"version": "2.0", "spec_version": "2.1"},
"http_version": "2",
"method": frame.get_method(),
"scheme": "https",
"path": frame.get_path(),
"query_string": "",
"headers": [
[k.encode("utf-8"), v.encode("utf-8")] for k, v in frame.headers.items()
],
}
return event_dict
async def handle_request(self):
try:
while True:
self.request_data = self.client_connection.recv(4096)
self.frame = self.parse_request(self.request_data)
if isinstance(self.frame, HeadersFrame):
if self.frame.end_stream:
current_stream = Stream(
self.connection_settings,
self.header_encoder,
self.header_decoder,
self.client_connection,
)
current_stream.stream_id = self.frame.stream_id
asgi_scope = self.get_asgi_event_dict(self.frame)
current_stream.asgi_app = self.application(asgi_scope)
# The next line puts the coroutine obtained from a call to
# current_stream.asgi_app on the "tasks" queue and hence
# out of my control to prioritize.
await current_stream.asgi_app(
self.trigger_asgi_application, current_stream.send_response
)
else:
self.asgi_scope = self.get_asgi_event_dict(self.frame)
except Exception:
print("Error occurred in handle_request")
print((traceback.format_exc()))
class Stream(object):
async def send_response(self, event):
# converting the event dict into http2 frames and sending them to the client.