0
votes

I am using Python 2.7 multiprocessing.Queue with a parent process that is the "producer" for the queue (i.e. a task "dispatcher") and a child process that is the consumer of the queue (i.e. a "worker" process). I need the child process to be non-"daemonic" so that, if an exception occurs in the parent process, it will not interrupt the child process's processing of existing queue items. However, I found that, if an exception occurs in the parent process, then a deadlock occurs because the parent process tries to join the child process yet the child process waits forever for more queue items once the queue is emptied. If, on the other hand, I make the child process "daemonic", then the child process gets terminated right when the parent process terminates from the exception, which interrupts the child's processing of existing queue items.

Example to demonstrate (using non-daemonic child process):

import multiprocessing, time

def childProcessFunction(sharedQueue):
    # Consumer loop
    while True:
        print "Child process: Consuming item.  We will deadlock here if the queue is empty and the parent process has terminated!"
        consumedItem=sharedQueue.get()

        print "Child process: Taking some time to process the consumed item..."
        time.sleep(5)
        print consumedItem # Do something with (print) the consumed item.
        time.sleep(5)
        print "Child process: Done processing the consumed item; re-looping to get another item..."

def parentProcessMain():
    sharedQueue=multiprocessing.Queue()

    childProcess=multiprocessing.Process(target=childProcessFunction, args=(sharedQueue,))

    # Don't want a daemon process because we don't want to interrupt the child's 
    # processing of existing items when the parent process terminates.
    childProcess.daemon=False

    print "Parent process: Starting child process..."
    childProcess.start()

    failAfter=3 # Simulate a failure after producing 3 items.

    numDispatched=0 # Number of items put onto the queue.

    # Producer loop
    while True:
        time.sleep(3)

        if failAfter==0:
            raise Exception("Parent process encountered a failure of some kind and is exiting uncleanly!  Now the parent process will try to join the child process, and the child process will deadlock once the queue is empty!")

        producedItem="This is item number: %d"%numDispatched
        print "Parent process: Enqueueing item number %d..."%numDispatched
        sharedQueue.put(producedItem)
        numDispatched+=1
        failAfter-=1
        time.sleep(3)

if __name__=="__main__":
    parentProcessMain()

Is there a way that I could allow the child process to finish processing whatever is in-flight and whatever else is remaining on the queue after the parent process gets the exception (the non-daemonic behavior), but prevent the child process from waiting on the queue forever thereafter? Is there a way to make the queue be "aware" of the fact that the producer has terminated and throw an exception on get() rather than blocking indefinitely?

Note that, if the queue happens to be empty and the child process is waiting in a get() call when the parent gets the exception, I would need get() to terminate in that case (presumably with an exception).

1

1 Answers

0
votes

Could you hold the exception until you know that the child process completes? Maybe via some form of synchronization like a mutex.