3
votes

I am writing a program that accepts RPC requests over AMQP for executing network requests (CoAP). When processing RPC requests, the aioamqp callback generates tasks that are responsible for network IO. These tasks can be considered background tasks, that will run indefinitely for streaming network responses over AMQP (in this case one RPC requests triggers a RPC response and data streaming).

I noticed that in my original code the network task would be destroyed after seemingly random time intervals (before it was finished), asyncio would then print the following warning "Task was destroyed but it is pending". This issue is similar to the one described here: https://bugs.python.org/issue21163.

For now I have circumvented the issue by storing a hard reference in a module-level list, which prevents the GC from destroying the task object. However, I was wondering if there is a better work around? Ideally I would want to call await task in the RPC callback, but I noticed that this prevents any further AMQP operations from completing -> e.g. creating a new amqp channel stalls and receiving rpc requests over amqp also stalls. I am unsure what is causing this stalling however (as the callback is itself a coroutine, I would expect waiting would not stall the entire aioamqp library).

I am posting the source below for the RPC client and server, both are based on the aioamqp/aiocoap examples. In the server, on_rpc_request is the amqp rpc callback and send_coap_obs_request is the networking coroutine that gets destroyed when the 'obs_tasks.append(task)' statement is removed.

client.py:

"""
    CoAP RPC client, based on aioamqp implementation of RPC examples from RabbitMQ tutorial
"""

import base64
import json
import uuid

import asyncio
import aioamqp


class CoAPRpcClient(object):
    def __init__(self):
        self.transport = None
        self.protocol = None
        self.channel = None
        self.callback_queue = None
        self.waiter = asyncio.Event()

    async def connect(self):
        """ an `__init__` method can't be a coroutine"""
        self.transport, self.protocol = await aioamqp.connect()
        self.channel = await self.protocol.channel()

        result = await self.channel.queue_declare(queue_name='', exclusive=True)
        self.callback_queue = result['queue']

        await self.channel.basic_consume(
            self.on_response,
            no_ack=True,
            queue_name=self.callback_queue,
        )

    async def on_response(self, channel, body, envelope, properties):
        if self.corr_id == properties.correlation_id:
            self.response = body

        self.waiter.set()

    async def call(self, n):
        if not self.protocol:
            await self.connect()
        self.response = None
        self.corr_id = str(uuid.uuid4())
        await self.channel.basic_publish(
            payload=str(n),
            exchange_name='',
            routing_key='coap_request_rpc_queue',
            properties={
                'reply_to': self.callback_queue,
                'correlation_id': self.corr_id,
            },
        )
        await self.waiter.wait()

        await self.protocol.close()
        return json.loads(self.response)


async def rpc_client():
    coap_rpc = CoAPRpcClient()

    request_dict = {}
    request_dict_json = json.dumps(request_dict)

    print(" [x] Send RPC coap_request({})".format(request_dict_json))
    response_dict = await coap_rpc.call(request_dict_json)
    print(" [.] Got {}".format(response_dict))


asyncio.get_event_loop().run_until_complete(rpc_client())

server.py:

"""
CoAP RPC server, based on aioamqp implementation of RPC examples from RabbitMQ tutorial
"""

import base64
import json
import sys

import logging
import warnings

import asyncio
import aioamqp
import aiocoap

amqp_protocol = None
coap_client_context = None
obs_tasks = []

AMQP_COAP_NOTIFICATIONS_EXCHANGE_NAME = 'topic_coap'
AMQP_COAP_NOTIFICATIONS_TOPIC_NAME = 'topic'
AMQP_COAP_NOTIFICATIONS_ROUTING_KEY = 'coap.response'

def create_response_dict(coap_request, coap_response):
    response_dict = {'request_uri': "", 'code': 0}
    response_dict['request_uri'] = coap_request.get_request_uri()
    response_dict['code'] = coap_response.code

    if len(coap_response.payload) > 0:
        response_dict['payload'] = base64.b64encode(coap_response.payload).decode('utf-8')

    return response_dict


async def handle_coap_response(amqp_envelope, amqp_properties, coap_request, coap_response):
    # create response dict:
    response_dict = create_response_dict(coap_request, coap_response)
    message = json.dumps(response_dict)

    # create new channel:
    global amqp_protocol
    amqp_channel = await amqp_protocol.channel()

    await amqp_channel.basic_publish(
        payload=message,
        exchange_name='',
        routing_key=amqp_properties.reply_to,
        properties={
            'correlation_id': amqp_properties.correlation_id,
        },
    )

    await amqp_channel.basic_client_ack(delivery_tag=amqp_envelope.delivery_tag)

    print(" [.] handle_coap_response() published response: {}".format(response_dict))


def incoming_observation(coap_request, coap_response):
    asyncio.async(handle_coap_notification(coap_request, coap_response))


async def handle_coap_notification(coap_request, coap_response):
    # create response dict:
    response_dict = create_response_dict(coap_request, coap_response)
    message = json.dumps(response_dict)

    # create new channel:
    global amqp_protocol
    amqp_channel = await amqp_protocol.channel()

    await amqp_channel.exchange(AMQP_COAP_NOTIFICATIONS_EXCHANGE_NAME, AMQP_COAP_NOTIFICATIONS_TOPIC_NAME)

    await amqp_channel.publish(message, exchange_name=AMQP_COAP_NOTIFICATIONS_EXCHANGE_NAME, routing_key=AMQP_COAP_NOTIFICATIONS_ROUTING_KEY)

    print(" [.] handle_coap_notification() published response: {}".format(response_dict))


async def send_coap_obs_request(amqp_envelope, amqp_properties, request_dict, coap_request):
    observation_is_over = asyncio.Future()
    try:
        global coap_client_context
        requester = coap_client_context.request(coap_request)
        requester.observation.register_errback(observation_is_over.set_result)
        requester.observation.register_callback(lambda data, coap_request=coap_request: incoming_observation(coap_request, data))

        try:
            print(" [..] Sending CoAP obs request: {}".format(request_dict))
            coap_response = await requester.response
        except socket.gaierror as  e:
            print("Name resolution error:", e, file=sys.stderr)
            return
        except OSError as e:
            print("Error:", e, file=sys.stderr)
            return

        if coap_response.code.is_successful():
            print(" [..] Received CoAP response: {}".format(coap_response))
            await handle_coap_response(amqp_envelope, amqp_properties, coap_request, coap_response)
        else:
            print(coap_response.code, file=sys.stderr)
            if coap_response.payload:
                print(coap_response.payload.decode('utf-8'), file=sys.stderr)
            sys.exit(1)

        exit_reason = await observation_is_over
        print("Observation is over: %r"%(exit_reason,), file=sys.stderr)

    finally:
        if not requester.response.done():
            requester.response.cancel()
        if not requester.observation.cancelled:
            requester.observation.cancel()


async def on_rpc_request(amqp_channel, amqp_body, amqp_envelope, amqp_properties):
    print(" [.] on_rpc_request(): received RPC request: {}".format(amqp_body))

    request_dict = {} # hardcoded to vdna.be for SO example
    aiocoap_code = aiocoap.GET
    aiocoap_uri = "coap://vdna.be/obs"
    aiocoap_payload = ""

    # as we are ready to send the CoAP request, ack the client already indicating we have received the RPC request
    await amqp_channel.basic_client_ack(delivery_tag=amqp_envelope.delivery_tag)

    coap_request = aiocoap.Message(code=aiocoap_code, uri=aiocoap_uri, payload=aiocoap_payload)
    coap_request.opt.observe = 0

    task = asyncio.ensure_future(send_coap_obs_request(amqp_envelope, amqp_properties, request_dict, coap_request))
    # we have to keep a hard ref to this task, otherwise the python garbage collector destroyes the task before it is completed. See https://bugs.python.org/issue21163
    # this is apparent from the "Task was destroyed but it is pending" exception thrown after random (lengthy) time intervals, probably the time interval is related to when the gc is triggered
    # await task # this does not seem to work, as it prevents new amqp operations from executing (e.g. amqp channels do not get created)
    # we are actually not interested in waiting for the task anyway, so instead just keep a hard ref to the task in the obs_tasks list
    obs_tasks.append(task) # TODO: when do we remove the task from the list?


async def amqp_connect():
    try:
        (transport, protocol) = await aioamqp.connect('localhost', 5672)
        print(" [x] Connected to AMQP broker")
        return (transport, protocol)
    except aioamqp.AmqpClosedConnection as ex:
        print("closed connections: {}".format(ex))
        raise ex


async def main():
    """Open AMQP connection to broker, subscribe to coap_request_rpc_queue and setup aiocoap client context """

    try:
        global amqp_protocol
        (amqp_transport, amqp_protocol) = await amqp_connect()

        channel = await amqp_protocol.channel()

        await channel.queue_declare(queue_name='coap_request_rpc_queue')
        await channel.basic_qos(prefetch_count=10, prefetch_size=0, connection_global=False)
        await channel.basic_consume(on_rpc_request, queue_name='coap_request_rpc_queue')

        print(" [x] Awaiting CoAP request RPC requests")
    except aioamqp.AmqpClosedConnection as ex:
        print("amqp_connect: closed connections: {}".format(ex))
        exit()

    global coap_client_context
    coap_client_context = await aiocoap.Context.create_client_context()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()

    loop.set_debug(True)

    asyncio.async(main())
    loop.run_forever()
1
I ran into a similar (exact?) issue while creating a background task that listened for entries into an asyncio.Queue, and after hours of debugging finally found the bug? mentioned here: bugs.python.org/issue21163 Ultimately my solution was to create a hard reference to the asyncio.Queue object as you mentioned, but I was surprised at how confusing this issue was.Charles Naccio

1 Answers

1
votes

When a task is scheduled, it's _step callback is scheduled in the loop. That callback maintains a reference to the task through self. I have not checked the code, but I have high confidence that the loop maintains a reference to its callbacks. However, when a task awaits some awaitable or future, the _step callback is not scheduled. In that case, the task adds a done callback that retains a reference to the task, but the loop does not retain references to tasks waiting for futures.

So long as something retains a reference to the future that the task is waiting on, all is well. However, if nothing retains a hard reference to the future, then the future can get garbage collected, and when that happens the task can get garbage collected.

So, I'd look for things that your task calls where the future the task is waiting on might not be referenced. In general the future needs to be referenced so someone can set its result eventually, so it is very likely a bug if you have unreferenced futures.