0
votes

I'm trying to implement the http/2 stack in my own app server which I've built from scratch using asyncio. As per my understanding, asyncio maintains a "tasks" queue internally which is used by the event loop to run tasks. Now, to implement stream prioritization I need to be able to run the high priority tasks for a longer duration than the low priority tasks (by tasks I'm thinking, the coroutine returned by a call to application(scope, receive, send) as per the ASGI spec.) I'm not able to find a way to prioritize this internal queue used by asyncio.
I even thought about capturing the event dicts I get as an argument to the send callable in application(scope, receive, send) but the asgi spec says "Protocol servers must flush any data passed to them into the send buffer before returning from a send call". What is meant by "send buffer" here? Is it the OS/Kernel send buffer?

Am I thinking about stream prioritization in the wrong sense? What would be a good approach to implement this?

class Worker(object):
  def get_asgi_event_dict(self, frame):
    event_dict = {
      "type": "http",
      "asgi": {"version": "2.0", "spec_version": "2.1"},
      "http_version": "2",
      "method": frame.get_method(),
      "scheme": "https",
      "path": frame.get_path(),
      "query_string": "",
      "headers": [
        [k.encode("utf-8"), v.encode("utf-8")] for k, v in frame.headers.items()
      ],
    }
    return event_dict  

  async def handle_request(self):
    try:
      while True:
        self.request_data = self.client_connection.recv(4096)
        self.frame = self.parse_request(self.request_data)
        if isinstance(self.frame, HeadersFrame):
          if self.frame.end_stream:
            current_stream = Stream(
              self.connection_settings,
              self.header_encoder,
              self.header_decoder,
              self.client_connection,
            )
            current_stream.stream_id = self.frame.stream_id
            asgi_scope = self.get_asgi_event_dict(self.frame)
            current_stream.asgi_app = self.application(asgi_scope)
            # The next line puts the coroutine obtained from a call to
            # current_stream.asgi_app on the "tasks" queue and hence
            # out of my control to prioritize.
            await current_stream.asgi_app(
              self.trigger_asgi_application, current_stream.send_response
            )
        else:
          self.asgi_scope = self.get_asgi_event_dict(self.frame)
    except Exception:
      print("Error occurred in handle_request")
      print((traceback.format_exc()))
class Stream(object):
  async def send_response(self, event):
    # converting the event dict into http2 frames and sending them to the client.
1
It would be helpful if you post a working and a failing unit test, showing correct operation of a pair of equal priority tasks, and incorrect sequencing of hi/lo priority tasks.J_H
@J_H I don't have a working example because I'm confused as to how to go about implementing it, but I'll put some code that I have right now for some referenceAkshay Takkar

1 Answers

1
votes

HTTP stream priorities operate at a different level from the asyncio run queue.

Asyncio primitives are inherently non-blocking. When asyncio is running its task queue, what kernel ends up seeing is a series of instructions what to do, such as "start connecting to A", "start writing X to B", and "continue writing Y to C". The order of these instructions within an iteration of the event loop is irrelevant, since the OS will execute them asynchronously anyway.

HTTP2 stream priorities play a role when multiplexing many streams over a single TCP connection, and this should be implemented in the layer that actually speaks HTTP2, such as aiohttp. For example, if you are an http server, the client may have requested the page and multiple images, all over a single TCP connection. When the socket is ready for writing, you get to choose which image to send (or continue sending), and that's where stream priorities come into play.

It's not about asyncio running tasks in a specific order, it's about asyncio primitives being used in the correct order.