1
votes

I have a simple Python program that I want to do three things:

  • Serve an HTTP document
  • Serve Websockets
  • Interact with the Websocket data

I am trying to use / grok asyncio. The issue is that I can't figure out how to access data acquired from a function in the main event loop.

For example in my code below I have two threads.

One thread is the HTTP server thread, one thread is the Websocket server thread and there is the main thread.

What I want to do is to print data captured in the websocket receiving thread in the main thread.

The only way I know how to do this is to use Queues to pass data between threads at which point I do not even know what the advantage of using asyncio is.

Similarly, it feels weird to pass the event loop to the serve_websocket function.

Can anyone please explain how to architect this to get data from the Websocket function into the main function?

It seems like / I want a way to do this without using the threading library at all, which seems possible. In an async project I would want to react to websocket events in different function than where they are called.

NOTE: I know there are other libraries for websockets and http serving with asyncio but this is an example to help me understarnd how to structure projects using this paradigm.

Thanks

#!/usr/bin/env python

import json
import socketserver
import threading
import http.server
import asyncio
import time

import websockets

SERVER_ADDRESS = '127.0.0.1'
HTTP_PORT = 8087
WEBSOCKET_PORT = 5678


def serve_http():
    http_handler = http.server.SimpleHTTPRequestHandler
    with socketserver.TCPServer(("", HTTP_PORT), http_handler) as httpd:
        print(f'HTTP server listening on port {HTTP_PORT}')
        httpd.serve_forever()


def serve_websocket(server, event_loop):
    print(f'Websocket server listening on port {WEBSOCKET_PORT}')
    event_loop.run_until_complete(server)
    event_loop.run_forever()


async def ws_callback(websocket, path):
    while True:
        data = await websocket.recv()
        # How do I access parsed_data in the main function below
        parsed_data = json.loads(data)
        await websocket.send(data)


def main():
    event_loop = asyncio.get_event_loop()
    ws_server = websockets.serve(ws_callback, SERVER_ADDRESS, WEBSOCKET_PORT)

    threading.Thread(target=serve_http, daemon=True).start()
    threading.Thread(target=serve_websocket, args=(ws_server, event_loop), daemon=True).start()

    try:
        while True:
            # Keep alive - this is where I want to access the data from ws_callback
            # i.e.
            # print(data.values)
            time.sleep(.01)

    except KeyboardInterrupt:
        print('Exit called')


if __name__ == '__main__':
    main()
1

1 Answers

1
votes

I believe that you should not mix asyncio and multithreading without special need. And in your case, use only asyncio tools.

In this case, you have no problem sharing data between coroutines, because they all run on the same thread using cooperative multitasking.

Your code can be rewtitten as:

#!/usr/bin/env python

import json
import socketserver
import threading
import http.server
import asyncio
import time
import websockets

SERVER_ADDRESS = '127.0.0.1'
HTTP_PORT = 8087
WEBSOCKET_PORT = 5678
parsed_data = {}


async def handle_http(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    writer.write(data)
    await writer.drain()
    writer.close()


async def ws_callback(websocket, path):
    global parsed_data
    while True:
        data = await websocket.recv()
        # How do I access parsed_data in the main function below
        parsed_data = json.loads(data)
        await websocket.send(data)


async def main():
    ws_server = await websockets.serve(ws_callback, SERVER_ADDRESS, WEBSOCKET_PORT)
    print(f'Websocket server listening on port {WEBSOCKET_PORT}')
    http_server = await asyncio.start_server(
        handle_http, SERVER_ADDRESS, HTTP_PORT)
    print(f'HTTP server listening on port {HTTP_PORT}')

    try:
        while True:
            if parsed_data:
                print(parsed_data.values())
            await asyncio.sleep(0.1)
    except KeyboardInterrupt:
        print('Exit called')


if __name__ == '__main__':
    asyncio.run(main())