0
votes
from multiprocessing import Process, Queue

def worker_main(q):
    while True:
        message = q.get()


q = Queue()
worker = Process(target=worker_main, args=(q,))
worker.start()
q.close()
worker.join()

I expect the call to q.get in worker_main() to throw an exception and exit after q is closed. Instead it hangs ever after the queue is closed in the main process.

My use case seems to be slightly different than the common examples which show Queue.put in the worker process and Queue.get in the main process.

In a main process I'm producing tasks that need to be distributed to a pool of worker processes via a queue. However when the tasks are complete I close the queue to indicate to the worker processes it's time to exit.

Perhaps I do not understand the documentation, but I think it's clear that future calls to get should raise an exception after close.

get([block[, timeout]])

Remove and return an item from the queue. If optional args block is True (the default) and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the queue.Empty exception if no item was available within that time. Otherwise (block is False), return an item if one is immediately available, else raise the queue.Empty exception (timeout is ignored in that case).

Changed in version 3.8: If the queue is closed, ValueError is raised instead of OSError.

1
Hmmm... future calls... But q.get() can be called before the queue is closed. That sounds like a bug, but maybe you can workaround by using timeouts ? - Vincent Fourmond
The documentation also specifically says that close indicates the current process will not put any more items in the queue. worker_main is executed in a different process than the one that called q.close(). - chepner
I don't know if there is a standard way to close, but I send a well-known sentinel (None is nice) per known reader of the queue. If some processes had terminated there will be too many None's in the queue... but it hasn't been a problem for me. - tdelaney
close is also documented as "usually unnecessary for most code". Its job isn't to pass messages to other processes, but to help the current process manage process-local resources. - chepner
Queue.close closes the current process' queue handle. It doesn't close the queue in other processes. - MisterMiyagi

1 Answers

0
votes

A multiprocessing.Queue represents a handle on a shared buffer. Notably, when processes are connected with a queue each process has its own copy of the handle.

Calling .close() only closes the current process' handle for reading and writing.

close()

Indicate that no more data will be put on this queue by the current process. […]


In order to gracefully close a queue for all subscribers, send a "close message". This is usually a well-defined sentinel object such as None.

from multiprocessing import Process, Queue

def worker_main(q):
    while (message := q.get()) is not None:
        print(message)


if __name__ == "__main__":
    q = Queue()
    q.put("Hello")
    worker = Process(target=worker_main, args=(q,))
    worker.start()
    q.put("World")
    q.put(None)  # close message
    q.close()
    worker.join()