2
votes

I'm trying to provide a synchronous shutdown function that can gracefully kill an asyncio application via a SIGTERM signal or KeyboardInterrupt SystemExit exception or just calling the function directly due to bad startup state. I have to shutdown various tasks that each have their own way of shutting down:

  • an aiohttp AppRunner currently killed via the shutdown method which returns a coroutine which needs to be awaited
  • an asyncio APScheduler currently killed via the shutdown method which calls call_soon_threadsafe on the current event loop
  • a simple async loop that runs forever currently kill via a cancel signal on the task
  • an aiohttp ClientSession which is cancelled via the close method on the session

I want to kill the message processor and ignore any new messages coming in, the scheduler but allow any tasks to complete that are currently running which are dependent on the aiohttp ClientSession

Here's an abbreviation of the current code and some comments to clarify the logic:

message_processor_future = loop.create_task(message_processor())

def sig_term_handler(_, __):
    logging.info("SIGTERM received, shutting down server...")
    shutdown_server(
        http_runner=http_runner,
        scheduler=scheduler,
        message_processor_future=message_processor_future
    )
signal.signal(signal.SIGTERM, sig_term_handler)

try:
    loop.run_until_complete(message_processor_future)
except (KeyboardInterrupt, SystemExit) as e:
    logging.info("{} received".format(e.__class__.__name__))
    shutdown_server(
        http_runner=http_runner,
        scheduler=scheduler,
        message_processor_future=message_processor_future
    )

async def message_processor():
    while True:
        try:
            # code
        except CancelledError:
            logging.info("Cancelling message processing...")
            return

def shutdown_server(
    http_runner: AppRunner = None,
    scheduler: AsyncIOScheduler = None,
    message_processor_future: asyncio.Task = None
):
    loop = asyncio.get_event_loop()
    # Try to shutdown to the message processor as early as possible so we don't get any new messages
    if message_processor_future:
        logging.info("Cancelling message processor...")
        message_processor_future.cancel()
    # Shutdown apscheduler early to make sure we don't schedule any new tasks
    if scheduler:
        logging.info("Shutting down scheduler...")
        scheduler.shutdown()
    # if the server is running then kill it (this doesn't really have any requirements as it's fairly separate from the application)
    if http_runner:
        logging.info("Shutting down http server...")
        loop.run_until_complete(http_runner.cleanup())

    logging.info(
        f"Waiting for {len(asyncio.Task.all_tasks(loop))} to complete..."
    )
    # wait for any tasks spawned by apscheduler to finish and the message processor to die if it's still running
    loop.run_until_complete(
        asyncio.wait(asyncio.Task.all_tasks(loop), timeout=10)
    )

    logging.info("Closing ingest api client...")
    from collector.tasks.ap_associations import api_client
    # Kill the client session as the tasks that use ClientSession have completed
    loop.run_until_complete(api_client.session.close())

    logging.info("Shutting down process...")
    exit(0)

When I cancel the application via KeyboardInterrupt or SystemExit it cleans up without any issues this is due to I believe the loop has stop running so calls to loop.run_until_complete are safe and synchronous, but when SIGTERM is received the loop is still running so I get this exception

[2019-06-03 14:52:26,985] [    INFO] --- Shutting down http server...
[2019-06-03 14:52:26,985] [   ERROR] --- Exception in callback Loop._read_from_self
handle: <Handle Loop._read_from_self>
Traceback (most recent call last):
  File "uvloop/cbhandles.pyx", line 67, in uvloop.loop.Handle._run
  File "uvloop/loop.pyx", line 324, in uvloop.loop.Loop._read_from_self
  File "uvloop/loop.pyx", line 329, in uvloop.loop.Loop._invoke_signals
  File "uvloop/loop.pyx", line 304, in uvloop.loop.Loop._ceval_process_signals
  File "/opt/collector/collector/__main__.py", line 144, in sig_term_handler
    message_processor_future=message_processor_future
  File "/opt/collector/collector/__main__.py", line 192, in shutdown_server
    loop.run_until_complete(http_runner.cleanup())
  File "uvloop/loop.pyx", line 1440, in uvloop.loop.Loop.run_until_complete
  File "uvloop/loop.pyx", line 1433, in uvloop.loop.Loop.run_until_complete
  File "uvloop/loop.pyx", line 1342, in uvloop.loop.Loop.run_forever
  File "uvloop/loop.pyx", line 445, in uvloop.loop.Loop._run
RuntimeError: this event loop is already running.

This makes sense, but I'm not exactly sure how to set up the shutdown method to handle this state, I tried using the add_done_callback method, but this also didn't seem to work because the application gets stuck in the while loop waiting for all the tasks to complete or be cancelled.

def shutdown_server(
    http_runner: AppRunner = None,
    scheduler: AsyncIOScheduler = None,
    message_processor_future: asyncio.Task = None
):
    loop = asyncio.get_event_loop()
    if loop.is_running():
        task_runner = loop.create_task
    else:
        task_runner = loop.run_until_complete

    if message_processor_future:
        logging.info("Cancelling message processor...")
        message_processor_future.cancel()

    if scheduler:
        logging.info("Shutting down scheduler...")
        scheduler.shutdown()

    if http_runner:
        logging.info("Shutting down http server...")
        task_runner(http_runner.shutdown())

    logging.info(
        f"Waiting for {len(asyncio.Task.all_tasks(loop))} to complete..."
    )

    def finish_shutdown():
        task_runner(http_runner.cleanup())
        logging.info("Closing ingest api client...")
        from collector.tasks.ap_associations import api_client
        task_runner(api_client.session.close())

        logging.info("Shutting down process...")
        exit(0)

    if loop.is_running():
        all_tasks_complete = loop.create_task(asyncio.wait(
            asyncio.Task.all_tasks(loop), timeout=10
        ))
        all_tasks_complete.add_done_callback(finish_shutdown)
        while not all_tasks_complete.done() and not all_tasks_complete.cancelled():
            pass
    else:
        loop.run_until_complete(asyncio.wait(
            asyncio.Task.all_tasks(loop), timeout=10
        ))
        finish_shutdown()

1

1 Answers

1
votes

I realized you can just call sys.exit in the signal handler and the loop will receive a SystemExit exception and continue through the rest of the catch clause with a stopped loop.

i.e.

signal.signal(signal.SIGTERM, lambda _, __: sys.exit(0))

which allows me to refactor the code to be much cleaner and I can also force the tasks to handle their own exceptions with this pattern:

try:
    loop.run_forever()
except (KeyboardInterrupt, SystemExit) as e:
    logging.info(f"{e.__class__.__name__} received")
except Exception as e:
    exception_manager.handle_exception(e)
finally:
    shutdown(http_server_manager, scheduler)