I would like to create either a Thread or a Process which runs forever in a While True loop.
I need to send and receive data to the worker in the form for queues, either a multiprocessing.Queue() or a collections.deque(). I prefer to use collections.deque() as it is significantly faster.
I also need to be able to kill the worker eventually (as it runs in a while True loop. Here is some test code I've put together to try and understand the differences between Threads, Processes, Queues, and deque ..
import time
from multiprocessing import Process, Queue
from threading import Thread
from collections import deque
class ThreadingTest(Thread):
def __init__(self, q):
super(ThreadingTest, self).__init__()
self.q = q
self.toRun = False
def run(self):
print("Started Thread")
self.toRun = True
while self.toRun:
if type(self.q) == type(deque()):
if self.q:
i = self.q.popleft()
print("Thread deque: " + str(i))
elif type(self.q) == type(Queue()):
if not self.q.empty():
i = self.q.get_nowait()
print("Thread Queue: " + str(i))
def stop(self):
print("Trying to stop Thread")
self.toRun = False
while self.isAlive():
time.sleep(0.1)
print("Stopped Thread")
class ProcessTest(Process):
def __init__(self, q):
super(ProcessTest, self).__init__()
self.q = q
self.toRun = False
self.ctr = 0
def run(self):
print("Started Process")
self.toRun = True
while self.toRun:
if type(self.q) == type(deque()):
if self.q:
i = self.q.popleft()
print("Process deque: " + str(i))
elif type(self.q) == type(Queue()):
if not self.q.empty():
i = self.q.get_nowait()
print("Process Queue: " + str(i))
def stop(self):
print("Trying to stop Process")
self.toRun = False
while self.is_alive():
time.sleep(0.1)
print("Stopped Process")
if __name__ == '__main__':
q = Queue()
t1 = ProcessTest(q)
t1.start()
for i in range(10):
if type(q) == type(deque()):
q.append(i)
elif type(q) == type(Queue()):
q.put_nowait(i)
time.sleep(1)
t1.stop()
t1.join()
if type(q) == type(deque()):
print(q)
elif type(q) == type(Queue()):
while q.qsize() > 0:
print(str(q.get_nowait()))
As you can see, t1 can either be ThreadingTest, or ProcessTest. Also, the queue passed to it can either be a multiprocessing.Queue or a collections.deque.
ThreadingTest works with a Queue or deque(). It also kills run() properly when the stop() method is called.
Started Thread
Thread deque: 0
Thread deque: 1
Thread deque: 2
Thread deque: 3
Thread deque: 4
Thread deque: 5
Thread deque: 6
Thread deque: 7
Thread deque: 8
Thread deque: 9
Trying to stop Thread
Stopped Thread
deque([])
ProcessTest is only able to read from the queue if it is of type multiprocessing.Queue. It doesn't work with collections.deque. Furthermore, I am unable to kill the process using stop().
Process Queue: 0
Process Queue: 1
Process Queue: 2
Process Queue: 3
Process Queue: 4
Process Queue: 5
Process Queue: 6
Process Queue: 7
Process Queue: 8
Process Queue: 9
Trying to stop Process
I'm trying to figure out why? Also, what would be the best way to use deque with a process? And, how would I go about killing the process using some sort of stop() method.
Queue
is empty before consuming from it. It's susceptible to race conditions, since if multiple threads are reading from aQueue
, it could become empty between the time you check its size and when you actually try toget
from it. In this case, I think you're better off just callingq.get()
directly. That way you're not constantly looping and therefore using CPU in the child while you wait for something to be in theQueue
. – danowhile self.is_alive: time.sleep(0.1)
call instop
, since you're callingjoin()
on the process/thread right afterwards. You might as well just calljoin()
. – dano