0
votes

I am building a system where a Producer sends a list of tasks to be queued which will be consumed by a number of Consumers.

Assume I have a list of tasks and they can be categorised into Black, Orange and Yellow. All the Black tasks are sent to Queue_0, Orange to Queue_1 and Yellow to Queue_2. And I will assign a worker to each queue(i.e: Consumer_0 to Queue_0, Consumer_1 to Queue_1 and Consumer_2 to Queue_2). If Black lists get larger, I want to add an extra Consumer(i.e: Consumer_3) to Queue_0 to aid Consumer_0.

I went through RabbitMQ tutorials on Worker Queues and Routing. I thought Routing will solve my problem. I launched three terminals, a producer and two consumers which will receive Black tasks. When the producer sends a few black tasks(Black_Task_1, Black_Task_2), both consumers received the two messages (i.e: Consumer_0 receives Black_Task_1 and Black_Task_2, Consumer_3 also receives Black_Task_1 and Black_Task_2) . I want my consumers to share the task, not do the same task. Example, Consumer_0 does Black_Task_1 while Consumer_3 does Black_Task_2. What configurations can I achieve that?

=============================

Update

This is a sample code taken from RabbitMQ, routing tutorial. I modified a little. Note that this code doesn't sent Black, Orange or Yellow queues. But the concept is there.

emit_log_direct.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print " [x] Sent %r:%r" % (severity, message)
connection.close()

receive_logs_direct.py

#!/usr/bin/env python
import pika
import sys
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \
                         (sys.argv[0],)
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)
    time.sleep(1)
    print " [x] Done"
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue=queue_name)

channel.start_consuming()

Producer

nuttynibbles$ ./4_emit_log_direct.py info "run run info"
 [x] Sent 'info':'run run info'

Consumer_0

nuttynibbles$ ./4_receive_logs_direct_customize.py info
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'info':'run run info'
 [x] Done

Consumer_3

nuttynibbles$ ./4_receive_logs_direct_customize.py info
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'info':'run run info'
 [x] Done
2
What do you mean share task - you want two consumers to work on the same task? - Burhan Khalid
Hi, I have further explain it in my question above. - chrizonline

2 Answers

0
votes

I think your basic issue is with this:

If Black lists queue get larger, I want to add an extra Consumer(i.e: Consumer_3) to Queue_0 to aid Consumer_0.

As soon as you add another consumer to the queue - it will pick up the next available message.

If the first consumer does not acknowledge the message; then multiple workers will be able to work on the same message as it will remain on the queue.

So make sure you are correctly acknowledging the messages:

By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin. [...] There aren't any message timeouts; RabbitMQ will redeliver the message only when the worker connection dies. It's fine even if processing a message takes a very, very long time.

Depending on the nature of the task, you may be able to split the work between multiple processes by creating a priority queue; which is used by C1 (a consumer) to get additional resources. In this case you'll have to have workers ready and listening on the separate priority queue; thus creating a sub-queue where T1 (a task) is being processed.

However, in other to do this, the initial C1 has to make sure the task is no longer available by acknowledging its receipt.

0
votes

I think that your problem is that you are creating a new Queue for each consumer. When you call

result = channel.queue_declare(exclusive=True)

queue_name = result.method.queue

in your consumer, this declares a new queue, tells RabbitMQ to generate a unique name for it, and marks it for exclusive use by the channel in the consumer that is calling it. That means that each consumer will have its own queue.

You then bind each new Queue to the exchange using the severity as a routing key. When a message comes into a direct Exchange, RabbitMQ will route a copy of it to every Queue that is bound with a matching routing key. There is no round-robin across the Queues. Each consumer will get a copy of the message, which is what you are observing.

I believe what you want to do is have each consumer use the same name for the queue, specify the name in the queue_declare, and don't make it exclusive. Then all the consumers will be listening to the same queue. The messages will be delivered to one of the consumers, basically in a round-robin fashion.

The producer (the emit_log.py program) doesn't declare or bind the queue - it doesn't have to, but if the binding isn't established before the message is sent, it will be discarded. If you are using a fixed queue, you can have the producer set it up as well, just be sure to use the same parameters (e.g. queue_name) as the consumer.