2
votes

I am trying to reliably send a message from a publisher to multiple consumers using RabbitMQ topic exchange.

I have configured durable queues (one per consumer) and I am sending persistent messages delivery_mode=2. I am also setting the channel in confim_delivery mode, and have added mandatory=True flag to publish.

Right now the service is pretty reliable, but messages get lost to one of the consumers if it stays down during a broker restart followed by a message publication.

It seems that broker can recover queues and messages on restart, but it doesn't seem to keep the binding between consumers and queues. So messages only reach one of the consumers and get lost for the one that is down.

Note: Messages do reach the queue and the consumer if the broker doesn't suffer a restart during the time a consumer is down. They accumulate properly on the queue and they are delivered to the consumer when it is up again.

Edit - adding consumers code:

import pika


class Consumer(object):
    def __init__(self, queue_name):
        self.queue_name = queue_name

    def consume(self):
        credentials = pika.PlainCredentials(
             username='myuser', password='mypassword')
        connection = pika.BlockingConnection(
             pika.ConnectionParameters(host='myhost', credentials=credentials))
        channel = connection.channel()
        channel.exchange_declare(exchange='myexchange', exchange_type='topic')
        channel.queue_declare(queue=self.queue_name, durable=True)
        channel.queue_bind(
            exchange='myexchange', queue=self.queue_name, routing_key='my.route')
        channel.basic_consume(
            consumer_callback=self.message_received, queue=self.queue_name)
        channel.start_consuming()

    def message_received(self, channel, basic_deliver, properties, body):
        print(f'Message received: {body}')
        channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)

You can assume each consumer server does something similar to:

c = Consumer('myuniquequeue')  # each consumer has a permanent queue name
c.consume()

Edit - adding publisher code:

def publish(message):
    credentials = pika.PlainCredentials(
        username='myuser', password='mypassword')
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='myhost', credentials=credentials))
    channel = connection.channel()
    channel.exchange_declare(exchange='myexchange', exchange_type='topic')
    channel.confirm_delivery()
    success = channel.basic_publish(
        exchange='myexchange',
        routing_key='my.route',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # make message persistent
        ),
        mandatory=True
    )
    if success:
        print("Message sent")
    else:
        print("Could not send message")
        # Save for sending later

It is worth saying that I am handling the error case on my own, and it is not the part I would like to improve. When my messages get lost to some of the consumers the flow goes through the success section

1
What do you mean by binding between consumers and queues? Every time consumer comes online, it has to start consuming from the existing queue or create a new oneAlex Buyny
I mean the operation performed by pika method channel.queue_bind. Without the routing_key the exchange wouldn't know which messages it should deliver to which queue, so the queue can forward messages to its consumer(s) or keep them for later if consumer(s) are not ready. You can use rabbitmqctl list_bindings to check the active bindings.NublicPablo
I think you're missing some important details. Can you share your consumer code? Are your consumers using auto-delete or exclusive queues?Luke Bakken
It's shared now. durable queues.NublicPablo

1 Answers

0
votes

Use basic.ack(delivery_tag=basic_deliver.delivery_tag) in your consumer callback method. This acknowledgement tells whether the consumer has received a message and processed it or not. If it's a negative acknowledgement, the message will be requeued.

Edit #1 In order to receive messages during broker crash, the broker needs to be distributed. It is a concept called Mirrored Queues in RabbitMQ. Mirrored Queues lets your queues to be replicated across the nodes in your cluster. If one of the nodes containing the queue goes down, the other node containing the queue will act as your broker.

For complete understanding refer this Mirrored Queues