13
votes

I wish to have a single producer, multiple consumer architecture in Python while performing multi-threaded programming. I wish to have an operation like this :

  1. Producer produces the data
  2. Consumers 1 ..N (N is pre-determined) wait for the data to arrive (block) and then process the SAME data in different ways.

So I need all the consumers to to get the same data from the producer.

When I used Queue to perform this, I realized that all but the first consumer would be starved with the implementation I have.

One possible solution is to have a unique queue for each of the consumer threads wherein the same data is pushed in multiple queues by the producer. Is there a better way to do this ?

from threading import Thread
import time
import random
from Queue import Queue

my_queue = Queue(0)

def Producer():
    global my_queue
    my_list = []
    for each in range (50):
        my_list.append(each)
    my_queue.put(my_list)

def Consumer1():
    print "Consumer1"
    global my_queue
    print my_queue.get()
    my_queue.task_done()

def Consumer2():
    print "Consumer2"
    global my_queue
    print my_queue.get()
    my_queue.task_done()


P = Thread(name = "Producer", target = Producer)

C1 = Thread(name = "Consumer1", target = Consumer1)

C2 = Thread(name = "Consumer2", target = Consumer2)


P.start()

C1.start()

C2.start()

In the example above, the C2 gets blocked indefinitely as C1 consumes the data produced by P1. What I would rather want is for C1 and C2 both to be able to access the SAME data as produced by P1.

Thanks for any code/pointers!

3
Since you have N consumers I would use a thread/process poolstelios

3 Answers

3
votes

Your producer creates only one job to do:

my_queue.put(my_list)

For example, put my_list twice, and both consumers work:

def Producer():
    global my_queue
    my_list = []
    for each in range (50):
        my_list.append(each)
    my_queue.put(my_list)
    my_queue.put(my_list)

So this way you put two jobs to queue with the same list.

However i have to warn you: to modify the same data in different threads without thread synchronization is generally bad idea.

Anyways, approach with one queue would not work for you, since one queue is supposed to be processed with threads with the same algorithm.

So, I advise you to go ahead with unique queue per each consumer, since other solutions are not as trivial.

2
votes

How about a per-thread queue then?

As part of starting each consumer, you would also create another Queue, and add this to a list of "all thread queues". Then start the producer, passing it the list of all queues, which he can then push data into all of them.

1
votes

A single-producers and five-consumers example, verified.

from multiprocessing import Process, JoinableQueue
import time
import os

q = JoinableQueue()

def producer():
    for item in range(30):
        time.sleep(2)
        q.put(item)
    pid = os.getpid()
    print(f'producer {pid} done')


def worker():
    while True:
        item = q.get()
        pid = os.getpid()
        print(f'pid {pid} Working on {item}')
        print(f'pid {pid} Finished {item}')
        q.task_done()

for i in range(5):
    p = Process(target=worker, daemon=True).start()

producers = []
# it is easy to extend it to multi producers.
for i in range(1):
    p = Process(target=producer)
    producers.append(p)
    p.start()

# make sure producers done
for p in producers:
    p.join()

# block until all workers are done
q.join()
print('All work completed')

Explanation:

  1. One producer and five consumers in this example.
  2. JoinableQueue is used to make sure all elements stored in queue will be processed. 'task_done' is for worker to notify an element is done. 'q.join()' will wait for all elements marked as done.
  3. With #2, there is no need to join wait for every worker.
  4. But it is important to join wait for producer to store element into queue. Otherwise, program exit immediately.