I am trying to reliably send a message from a publisher to multiple consumers using RabbitMQ topic
exchange.
I have configured durable queues (one per consumer) and I am sending persistent messages delivery_mode=2
. I am also setting the channel in confim_delivery
mode, and have added mandatory=True
flag to publish.
Right now the service is pretty reliable, but messages get lost to one of the consumers if it stays down during a broker restart followed by a message publication.
It seems that broker can recover queues and messages on restart, but it doesn't seem to keep the binding between consumers and queues. So messages only reach one of the consumers and get lost for the one that is down.
Note: Messages do reach the queue and the consumer if the broker doesn't suffer a restart during the time a consumer is down. They accumulate properly on the queue and they are delivered to the consumer when it is up again.
Edit - adding consumers code:
import pika
class Consumer(object):
def __init__(self, queue_name):
self.queue_name = queue_name
def consume(self):
credentials = pika.PlainCredentials(
username='myuser', password='mypassword')
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='myhost', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='myexchange', exchange_type='topic')
channel.queue_declare(queue=self.queue_name, durable=True)
channel.queue_bind(
exchange='myexchange', queue=self.queue_name, routing_key='my.route')
channel.basic_consume(
consumer_callback=self.message_received, queue=self.queue_name)
channel.start_consuming()
def message_received(self, channel, basic_deliver, properties, body):
print(f'Message received: {body}')
channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)
You can assume each consumer server does something similar to:
c = Consumer('myuniquequeue') # each consumer has a permanent queue name
c.consume()
Edit - adding publisher code:
def publish(message):
credentials = pika.PlainCredentials(
username='myuser', password='mypassword')
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='myhost', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='myexchange', exchange_type='topic')
channel.confirm_delivery()
success = channel.basic_publish(
exchange='myexchange',
routing_key='my.route',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
),
mandatory=True
)
if success:
print("Message sent")
else:
print("Could not send message")
# Save for sending later
It is worth saying that I am handling the error case on my own, and it is not the part I would like to improve. When my messages get lost to some of the consumers the flow goes through the success section
binding between consumers and queues
? Every time consumer comes online, it has to start consuming from the existing queue or create a new one – Alex Buynychannel.queue_bind
. Without therouting_key
the exchange wouldn't know which messages it should deliver to which queue, so the queue can forward messages to its consumer(s) or keep them for later if consumer(s) are not ready. You can userabbitmqctl list_bindings
to check the active bindings. – NublicPablo