3
votes

I would like to create either a Thread or a Process which runs forever in a While True loop.

I need to send and receive data to the worker in the form for queues, either a multiprocessing.Queue() or a collections.deque(). I prefer to use collections.deque() as it is significantly faster.

I also need to be able to kill the worker eventually (as it runs in a while True loop. Here is some test code I've put together to try and understand the differences between Threads, Processes, Queues, and deque ..

import time
from multiprocessing import Process, Queue
from threading import Thread
from collections import deque

class ThreadingTest(Thread):

    def __init__(self, q):
        super(ThreadingTest, self).__init__()
        self.q = q
        self.toRun = False

    def run(self):
        print("Started Thread")
        self.toRun = True
        while self.toRun:
            if type(self.q) == type(deque()):
                if self.q:
                    i = self.q.popleft()
                    print("Thread deque: " + str(i))
            elif type(self.q) == type(Queue()):
                if not self.q.empty():
                    i = self.q.get_nowait()
                    print("Thread Queue: " + str(i))

    def stop(self):
        print("Trying to stop Thread")
        self.toRun = False
        while self.isAlive():
            time.sleep(0.1)
        print("Stopped Thread")

class ProcessTest(Process):

    def __init__(self, q):
        super(ProcessTest, self).__init__()
        self.q = q
        self.toRun = False
        self.ctr = 0

    def run(self):
        print("Started Process")
        self.toRun = True
        while self.toRun:
            if type(self.q) == type(deque()):
                if self.q:
                    i = self.q.popleft()
                    print("Process deque: " + str(i))
            elif type(self.q) == type(Queue()):
                if not self.q.empty():
                    i = self.q.get_nowait()
                    print("Process Queue: " + str(i))

    def stop(self):
        print("Trying to stop Process")
        self.toRun = False
        while self.is_alive():
            time.sleep(0.1)
        print("Stopped Process")

if __name__ == '__main__':
    q = Queue()
    t1 = ProcessTest(q)
    t1.start()

    for i in range(10):
        if type(q) == type(deque()):
            q.append(i)
        elif type(q) == type(Queue()):
            q.put_nowait(i)
        time.sleep(1)
    t1.stop()
    t1.join()

    if type(q) == type(deque()):
        print(q)
    elif type(q) == type(Queue()):
        while q.qsize() > 0:
            print(str(q.get_nowait()))

As you can see, t1 can either be ThreadingTest, or ProcessTest. Also, the queue passed to it can either be a multiprocessing.Queue or a collections.deque.

ThreadingTest works with a Queue or deque(). It also kills run() properly when the stop() method is called.

Started Thread
Thread deque: 0
Thread deque: 1
Thread deque: 2
Thread deque: 3
Thread deque: 4
Thread deque: 5
Thread deque: 6
Thread deque: 7
Thread deque: 8
Thread deque: 9
Trying to stop Thread
Stopped Thread
deque([])

ProcessTest is only able to read from the queue if it is of type multiprocessing.Queue. It doesn't work with collections.deque. Furthermore, I am unable to kill the process using stop().

Process Queue: 0
Process Queue: 1
Process Queue: 2
Process Queue: 3
Process Queue: 4
Process Queue: 5
Process Queue: 6
Process Queue: 7
Process Queue: 8
Process Queue: 9
Trying to stop Process

I'm trying to figure out why? Also, what would be the best way to use deque with a process? And, how would I go about killing the process using some sort of stop() method.

1
I can kill the process in the following way: Create another class variable: self.killQ = Queue(): Instead of while toRun, I can do while self.killQ.empty(): In the stop() method I can append a dummy object to the Queue, like self.killQ.put_nowait(0) But ... why won't a class variable like toRun work? Does it have something to do with the assignment of self.toRun = False in the stop() method?dev-null
Another observation with the Process. I can use a deque within the process to pass stuff around, and it works fine. I just can't use a deque to communicate between processes (main, and ProcessTest in this example).dev-null
As an aside, its generally a bad practice to check if a Queue is empty before consuming from it. It's susceptible to race conditions, since if multiple threads are reading from a Queue, it could become empty between the time you check its size and when you actually try to get from it. In this case, I think you're better off just calling q.get() directly. That way you're not constantly looping and therefore using CPU in the child while you wait for something to be in the Queue.dano
I would also remove the while self.is_alive: time.sleep(0.1) call in stop, since you're calling join() on the process/thread right afterwards. You might as well just call join().dano

1 Answers

9
votes

You can't use a collections.deque to pass data between two multiprocessing.Process instances, because collections.deque is not process-aware. multiprocessing.Queue writes its contents to a multiprocessing.Pipe internally, which means that data in it can be enqueued in once process and retrieved in another. collections.deque doesn't have that kind of plumbing, so it won't work. When you write to the deque in one process, the deque instance in the other process won't be affected at all; they're completely separate instances.

A similar issue is happening to your stop() method. You're changing the value of toRun in the main process, but this won't affect the child at all. They're completely separate instances. The best way to end the child would be to send some sentinel to the Queue. When you get the sentinel in the child, break out of the infinite loop:

def run(self):
    print("Started Process")
    self.toRun = True
    while self.toRun:
        if type(self.q) == type(deque()):
            if self.q:
                i = self.q.popleft()
                print("Process deque: " + str(i))
        elif type(self.q) == type(Queue()):
            if not self.q.empty():
                i = self.q.get_nowait()
                if i is None:  
                    break  # Got sentinel, so break
                print("Process Queue: " + str(i))

def stop(self):
    print("Trying to stop Process")
    self.q.put(None)  # Send sentinel
    while self.is_alive():
        time.sleep(0.1)
    print("Stopped Process")

Edit:

If you actually do need deque semantics between two process, you can use a custom multiprocessing.Manager() to create a shared deque in a Manager process, and each of your Process instances will get a Proxy to it:

import time
from multiprocessing import Process
from multiprocessing.managers import SyncManager
from collections import deque

SyncManager.register('deque', deque)

def Manager():
    m = SyncManager()
    m.start()
    return m

class ProcessTest(Process):
    def __init__(self, q):
        super(ProcessTest, self).__init__()
        self.q = q
        self.ctr = 0

    def run(self):
        print("Started Process")
        self.toRun = True
        while self.toRun:
            if self.q._getvalue():
                i = self.q.popleft()
                if i is None:
                    break
                print("Process deque: " + str(i))

    def stop(self):
        print("Trying to stop Process")
        self.q.append(None)
        while self.is_alive():
            time.sleep(0.1)
        print("Stopped Process")

if __name__ == '__main__':
    m = Manager()
    q = m.deque()
    t1 = ProcessTest(q)
    t1.start()

    for i in range(10):
        q.append(i)
        time.sleep(1)
    t1.stop()
    t1.join()

    print(q)

Note that this probably isn't going to be faster than a multiprocessing.Queue, though, since there's an IPC cost for every time you access the deque. It's also a much less natural data structure for passing messages the way you are.