1
votes

I have a TCP server running and have a handler function which needs to take the contents of the request, add it to an asyncio queue and reply with an OK status.

On the background I have an async coroutine running that detects when a new item is added and performs some processing.

How do I put items in the asyncio queue from the handler function, which isn't and can't be an async coroutine?

I am running a DICOM server pynetdicom which listens on port 104 for incoming TCP requests (DICOM C-STORE specifically).

I need to save the contents of the request to a queue and return a a 0x0000 response so that the listener is available to the network.

This is modeled by a producer-consumer pattern.

I have tried to define a consumer co-routine consume_dicom() that is currently stuck in await queue.get() since I can't properly define the producer.

The producer needs to simply invoke queue.put(produce_item) but this happens inside a handle_store(event) function which is not part of the event_loop but is called every time a request is received by the server.

import asyncio

from pynetdicom import (
    AE, evt,
    StoragePresentationContexts
)

class PacsServer():
    def __init__(self, par, listen=True):
        # Initialize other stuff...

        # Initialize DICOM server
        ae = AE(ae_title='DICOM-NODE')
        ae.supported_contexts = StoragePresentationContexts

        # When a C-STORE request comes, it will be passed to self.handle_store
        handlers = [(evt.EVT_C_STORE, self.handle_store)]

        # Define queue
        loop = asyncio.get_event_loop()
        self.queue = asyncio.Queue(loop=loop)

        # Define consumer
        loop.create_task(self.consume_dicom(self.queue))

        # Start server in the background with specified handlers
        self.scp = ae.start_server(('', 104), block=False, evt_handlers=handlers)

        # Start async loop
        self.loop.run_forever()



    def handle_store(self, event):
        # Request handling
        ds = event.dataset

        # Here I want to add to the queue but this is not an async method
        await queue.put(ds)

        return 0x0000


    async def consume_dicom(self, queue):
        while True:
            print(f"AWAITING FROM QUEUE")
            ds = await queue.get()

            do_some_processing(ds)

I would like to find a way to add items to the queue and return the OK status in the handle_store() function.

1
Try changing await queue.put(ds) to self.loop.call_soon_threadsafe(queue.put_nowait, ds)user4815162342
@user4815162342 Thanks a lot! It worked perfectly and with no errors of any kind, this is exactly what I was looking for!Guillermo

1 Answers

1
votes

Since handle_store is running in a different thread, it needs to tell the event loop to enqueue the item. This is done with call_soon_threadsafe:

self.loop.call_soon_threadsafe(queue.put_nowait, ds)

Note that you need to call queue.put_nowait instead of queue.put because the former is a function rather than a coroutine. The function will always succeed for unbounded queues (the default), otherwise it will raise an exception if the queue is full.