2
votes

I'm attempting to unit test an asynchronous socket server and using pytest-asyncio to make pytest compatible with the async code base. The server, once started, is always up to send a reply via a while loop, and probably spends most of its time awaiting an incoming message in client_loop(). The problem is there's no way to cancel this task before the unit test framework terminates the event loop and this warning gets issued:

Task was destroyed but it is pending!

task: < Task pending coro=< Server.new_client() done, defined at /[...path...]/server.py:16> wait_for=< Future pending cb=[< TaskWakeupMethWrapper object at 0x106d7cbe8>()]>>

The only task it appears I have access to is the task created by asyncio.create_task(), which seems to not be the same task. That task looks like this:

task: < Task pending coro=< start_server() running at /usr/local/Cellar/python/[...different path...]/streams.py:86>>

So calling task.cancel(); await task.wait_cancelled() on this task is having no effect.

How can this unit test be written to cleanly start and start the server for each test and not cut off tasks which may still be running?

Here's the example:

test_server.py

import pytest
import asyncio

@pytest.fixture
async def server(event_loop):
    from server import Server
    the_server = Server()
    await the_server.start()
    yield the_server
    the_server.stop()

@pytest.mark.asyncio
async def test_connect(server):
    loop = asyncio.get_event_loop()
    reader, writer = await asyncio.open_connection('0.0.0.0', 8888, loop = loop)
    writer.write(b'something')
    await reader.read(100)
    writer.write(b'something else')
    await reader.read(100)
    assert 1

server.py

import asyncio

class Server():
    async def start(self):
        loop = asyncio.get_event_loop()
        coro = asyncio.start_server(self.new_client, '0.0.0.0', 8888, loop = loop)
        task = loop.create_task(coro)
        print('\n')
        print(task)
        self.server = await task

    def stop(self):
        self.server.close()

    async def new_client(self, reader, writer):
        await self.client_loop(reader, writer)

    async def client_loop(self, reader, writer):
        while True:
            await reader.read(100)
            writer.write(b'reply')

If you want to run this example just run pip3 install pytest-asyncio and pytest can pick up this plugin.

2

2 Answers

4
votes

You have to await self.server.wait_closed() after calling self.server.close().

Your fixture should thus look like this:

@pytest.fixture
async def server(event_loop):
    from server import Server
    the_server = Server()
    await the_server.start()
    yield the_server
    await the_server.stop()

And the stop method of your Server should look like this:

    async def stop(self):
        self.server.close()
        await self.server.wait_closed()

See the documentation for details.

1
votes

The asyncio.Server.stop() method does not completely stop the server. It merely stops accepting new connections. Any connections created before close will continue to execute until finished.

According to the documentation (emphasis mine):

Stop serving: close listening sockets and set the sockets attribute to None.

The sockets that represent existing incoming client connections are left open.

The server is closed asynchronously, use the wait_closed() coroutine to wait until the server is closed.

In this example, all connections are sent to the infinite client_loop method.

A better solution is to create a collection of Tasks in new_client() responsible for executing client_loop() logic instead of awaiting the method directly. With this approach, all open Tasks can be terminated cleanly in the stop() method.