3
votes

I'm developing a CLI that interacts with a web service. When run, it will try to establish communication with it, send requests, receive and process replies and then terminate. I'm using coroutines in various parts of my code and asyncio to drive them. What I'd like is to be able to perform all these steps and then have all coroutines cleanly terminate at the end (i.e. in a way that doesn't cause asyncio to complain). Unfortunately, I'm finding asyncio a lot more difficult to use and understand than asynchronicity in other languages like C#.

I define a class that handles all direct communication with the web service over websocket connections:

class CommunicationService(object):
    def __init__(self):
        self._ws = None
        self._listen_task = None
        self._loop = asyncio.get_event_loop()

    ...

    async def _listen_for_responses(self):
        logging.debug('Listening for responses')
        while True:
            message = await self._ws.recv()
            self.__received_response.on_next(message)

    def establish_communication(self, hostname: str, port: int) -> None:
        websocket_address = 'ws://{}:{}/'.format(hostname, port)

        self._ws = self._loop.run_until_complete(websockets.connect(websocket_address))
        self._listen_task = asyncio.ensure_future(self._listen_for_responses())

    def send_request(self, request: str) -> None:
        return asyncio.ensure_future(self._ws.send(request))

    def stop(self):
        if self._listen_task:
            self._loop.call_soon_threadsafe(self._listen_task.cancel)

        if self._ws and self._ws.open:
            self._ws.close()

This class makes use of the websockets and RxPY libraries. When establishing communication, instances of this class will create an indefinitely-running task that will await responses from the web service and publish them on an RxPY subject.

I run CommunicationService.establish_communication in the main CLI method:

def cli(context, hostname, port):
    log_level = context.meta['click_log.core.logger']['level']
    _initialize_logging(log_level)

    # Create a new event loop for processing commands asynchronously on.
    loop = asyncio.new_event_loop()
    loop.set_debug(log_level == logging.DEBUG)
    asyncio.set_event_loop(loop)

    # Establish communication with TestCube Web Service.
    context.comms = CommunicationService()
    context.comms.establish_communication(hostname, port)
    ...

Depending on the supplied CLI arguments, this may invoke a subcommand callback, which I implement as a coroutine function.

I then register a function to handle the results of the invoked subcommands, which will either be None or a coroutine object:

@cli.resultcallback()
@click.pass_context
def _handle_command_task(context, task: Coroutine, **_) -> None:
    if task:
        loop = asyncio.get_event_loop()
        result = loop.run_until_complete(task)
        context.comms.stop()
        loop.close()
        if result:
            print(result, end='')

My program works, but I get the following output (when running the CLI at INFO log level):

$ testcube relays.0.enabled false
2016-09-06 12:33:51,157 [INFO    ] testcube.comms - Establishing connection to TestCube Web Service @ 127.0.0.1:36364
2016-09-06 12:33:51,219 [ERROR   ] asyncio - Task was destroyed but it is pending!
task: <Task pending coro=<CommunicationService._listen_for_responses() running at c:\users\davidfallah\pycharmprojects\testcube\testcube\comms.py:34> wait_for=<Future pending cb=[Task._wakeup()]>>
2016-09-06 12:33:51,219 [ERROR   ] asyncio - Task was destroyed but it is pending!
task: <Task pending coro=<WebSocketCommonProtocol.run() running at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\websockets\protocol.py:413> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\tasks.py:414]>
2016-09-06 12:33:51,219 [ERROR   ] asyncio - Task was destroyed but it is pending!
task: <Task pending coro=<Queue.get() running at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\queues.py:168> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\tasks.py:414]>
Exception ignored in: <generator object Queue.get at 0x03643600>
Traceback (most recent call last):
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\queues.py", line 170, in get
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\futures.py", line 227, in cancel
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\futures.py", line 242, in _schedule_callbacks
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 497, in call_soon
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 506, in _call_soon
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 334, in _check_closed
RuntimeError: Event loop is closed

If I change CommunicationService.stop() to directly cancel the response-listening task (as opposed to scheduling it)...

self._listen_task.cancel()
#self._loop.call_soon_threadsafe(self._listen_task.cancel)

I get the following output instead:

...
task: <Task pending coro=<CommunicationService._listen_for_responses() running at c:\users\davidfallah\pycharmprojects\testcube\testcube\comms.py:34> wait_for=<Future cancelled>>
...

In which wait_for is <Future cancelled> (as opposed to wait_for=<Future pending cb=[Task._wakeup()]>>). I don't understand how I call Task.cancel() and it says future cancelled but the task is still pending. Do I need to do something special with the task e.g. wrap the code in try...except asyncio.CancelledException...?

If it's useful at all, this is the DEBUG-level output of the same command:

$ testcube -v DEBUG relays.0.enabled false
2016-09-06 12:48:10,145 [DEBUG   ] asyncio - Using selector: SelectSelector
2016-09-06 12:48:10,147 [DEBUG   ] Rx - CurrentThreadScheduler.schedule(state=None)
2016-09-06 12:48:10,147 [INFO    ] testcube.comms - Establishing connection to TestCube Web Service @ 127.0.0.1:36364
2016-09-06 12:48:10,153 [DEBUG   ] asyncio - connect <socket.socket fd=608, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6> to ('127.0.0.1', 36364)
2016-09-06 12:48:10,156 [DEBUG   ] asyncio - poll took 0.000 ms: 1 events
2016-09-06 12:48:10,163 [DEBUG   ] asyncio - <socket.socket fd=608, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 56647), raddr=('127.0.0.1', 36364)> connected to 127.0.0.1:36364: (<_SelectorSocketTransport fd=608 read=polling write=<idle, bufsize=0>>, <websockets.client.WebSocketClientProtocol object at 0x03623BF0>)
2016-09-06 12:48:10,198 [DEBUG   ] asyncio - poll took 31.000 ms: 1 events
2016-09-06 12:48:10,202 [DEBUG   ] root - Connected using websocket address: ws://127.0.0.1:36364/
2016-09-06 12:48:10,202 [DEBUG   ] Rx - CurrentThreadScheduler.schedule(state=None)
2016-09-06 12:48:10,203 [DEBUG   ] testcube.components.core - Using write handler
2016-09-06 12:48:10,203 [DEBUG   ] root - Listening for responses
2016-09-06 12:48:10,205 [DEBUG   ] testcube.comms - Sending request: {"op": "replace", "value": false, "path": "testcube.relays[0].enabled"}
2016-09-06 12:48:10,208 [DEBUG   ] websockets.protocol - client >> Frame(fin=True, opcode=1, data=b'{"op": "replace", "value": false, "path": "testcube.relays[0].enabled"}')
2016-09-06 12:48:10,209 [DEBUG   ] asyncio - Close <_WindowsSelectorEventLoop running=False closed=False debug=True>
2016-09-06 12:48:10,222 [ERROR   ] asyncio - Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "C:\Users\davidfallah\AppData\Local\Programs\Python\Python35-32\Scripts\testcube-script.py", line 9, in <module>
    load_entry_point('testcube', 'console_scripts', 'testcube')()
  File "c:\users\davidfallah\pycharmprojects\testcube\testcube\testcube.py", line 198, in main
    cli(default_map=_get_default_settings())
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 716, in __call__
    return self.main(*args, **kwargs)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 696, in main
    rv = self.invoke(ctx)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 1057, in invoke
    Command.invoke(self, ctx)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 889, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 534, in invoke
    return callback(*args, **kwargs)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\decorators.py", line 17, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "c:\users\davidfallah\pycharmprojects\testcube\testcube\testcube.py", line 168, in cli
    context.comms.establish_communication(hostname, port)
  File "c:\users\davidfallah\pycharmprojects\testcube\testcube\comms.py", line 48, in establish_communication
    self._listen_task = asyncio.ensure_future(self._listen_for_responses())
task: <Task pending coro=<CommunicationService._listen_for_responses() running at c:\users\davidfallah\pycharmprojects\testcube\testcube\comms.py:34> wait_for=<Future cancelled created at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py:252> created at c:\users\davidfallah\pycharmprojects\testcube\testcube\comms.py:48>
2016-09-06 12:48:10,223 [ERROR   ] asyncio - Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "C:\Users\davidfallah\AppData\Local\Programs\Python\Python35-32\Scripts\testcube-script.py", line 9, in <module>
    load_entry_point('testcube', 'console_scripts', 'testcube')()
  File "c:\users\davidfallah\pycharmprojects\testcube\testcube\testcube.py", line 198, in main
    cli(default_map=_get_default_settings())
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 716, in __call__
    return self.main(*args, **kwargs)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 696, in main
    rv = self.invoke(ctx)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 1057, in invoke
    Command.invoke(self, ctx)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 889, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 534, in invoke
    return callback(*args, **kwargs)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\decorators.py", line 17, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "c:\users\davidfallah\pycharmprojects\testcube\testcube\testcube.py", line 168, in cli
    context.comms.establish_communication(hostname, port)
  File "c:\users\davidfallah\pycharmprojects\testcube\testcube\comms.py", line 47, in establish_communication
    self._ws = self._loop.run_until_complete(websockets.connect(websocket_address))
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 375, in run_until_complete
    self.run_forever()
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 345, in run_forever
    self._run_once()
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 1304, in _run_once
    handle._run()
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\events.py", line 125, in _run
    self._callback(*self._args)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\streams.py", line 238, in connection_made
    self._stream_writer)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\websockets\protocol.py", line 633, in client_connected
    self.worker_task = asyncio_ensure_future(self.run(), loop=self.loop)
task: <Task pending coro=<WebSocketCommonProtocol.run() running at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\websockets\protocol.py:413> wait_for=<Future pending cb=[Task._wakeup()] created at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py:252> cb=[_wait.<locals>._on_completion() at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\tasks.py:414] created at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\websockets\protocol.py:633>
2016-09-06 12:48:10,223 [ERROR   ] asyncio - Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "C:\Users\davidfallah\AppData\Local\Programs\Python\Python35-32\Scripts\testcube-script.py", line 9, in <module>
    load_entry_point('testcube', 'console_scripts', 'testcube')()
  File "c:\users\davidfallah\pycharmprojects\testcube\testcube\testcube.py", line 198, in main
    cli(default_map=_get_default_settings())
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 716, in __call__
    return self.main(*args, **kwargs)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 696, in main
    rv = self.invoke(ctx)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 1060, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 1025, in _process_result
    **ctx.params)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\core.py", line 534, in invoke
    return callback(*args, **kwargs)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\click\decorators.py", line 17, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "c:\users\davidfallah\pycharmprojects\testcube\testcube\testcube.py", line 190, in _handle_command_task
    result = loop.run_until_complete(task)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 375, in run_until_complete
    self.run_forever()
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 345, in run_forever
    self._run_once()
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 1304, in _run_once
    handle._run()
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\events.py", line 125, in _run
    self._callback(*self._args)
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\tasks.py", line 239, in _step
    result = coro.send(None)
  File "c:\users\davidfallah\pycharmprojects\testcube\testcube\comms.py", line 34, in _listen_for_responses
    message = await self._ws.recv()
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\websockets\protocol.py", line 280, in recv
    self.messages.get(), loop=self.loop)
task: <Task pending coro=<Queue.get() running at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\queues.py:168> wait_for=<Future pending cb=[Task._wakeup()] created at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py:252> cb=[_wait.<locals>._on_completion() at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\tasks.py:414] created at c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\site-packages\websockets\protocol.py:280>
Exception ignored in: <generator object Queue.get at 0x03641240>
Traceback (most recent call last):
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\queues.py", line 170, in get
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\futures.py", line 227, in cancel
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\futures.py", line 242, in _schedule_callbacks
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 497, in call_soon
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 506, in _call_soon
  File "c:\users\davidfallah\appdata\local\programs\python\python35-32\lib\asyncio\base_events.py", line 334, in _check_closed
RuntimeError: Event loop is closed
1

1 Answers

4
votes

I figured it out - I need to define CommunicationService.stop() as the following:

def stop(self):
    if self._listen_task is None or self._ws is None:
        return

    self._listen_task.cancel()
    self._loop.run_until_complete(asyncio.wait([self._listen_task, self._ws.close()]))

    self._listen_task = None
    self._ws = None

As documentation for anyone else that might end up struggling with related issues, the complete cleanup code is now:

@cli.resultcallback()
@click.pass_context
def _handle_command_task(context, task: Coroutine, **_) -> None:
    if task:
        loop = asyncio.get_event_loop()
        result = loop.run_until_complete(task)
        context.comms.stop()
        loop.close()
        if result:
            print(result, end='')