Is there a specific type of Queue that is "closable", and is suitable for when there are multiple producers, consumers, and the data comes from a stream (so its not known when it will end)?
I've been unable to find a queue that implements this sort of behavior, or a name for one, but it seems like a integral type for producer-consumer type problems.
As an example, ideally I could write code where (1) each producer would tell the queue when it was done, (2) consumers would blindly call a blocking get(), and (3) when all consumers were done, and the queue was empty, all the producers would unblock and receive a "done" notification:
As code, it'd look something like this:
def produce():
for x in range(randint()):
queue.put(x)
sleep(randint())
queue.close() # called once for every producer
def consume():
while True:
try:
print queue.get()
except ClosedQueue:
print 'done!'
break
num_producers = randint()
queue = QueueTypeThatICantFigureOutANameFor(num_producers)
[Thread(target=produce).start() for _ in range(num_producers)]
[Thread(target=consume).start() for _ in range(random())
Also, I'm not looking for the "Poison Pill" solution, where a "done" value is added to the queue for every consumer -- I don't like the inelegance of producers needing to know how many consumers there are.