0
votes

I am running RabbitMQ 3.7.28 on a linux node, single installation, no more cluster nodes. The MQTT message plugin is enabled, both TLS and non-TLS connections are successful. Using python 3.8 and pika 1.1.0 I have sent 1,000,000 messages to the broker via AMQP.

During sending of the messages I had two consumer connected: one using pika/AMQP, one using paho-mqtt 1.5.1. Both consumers received 1,000,000 messages.

Then I tried sending messages using paho-mqtt, and after this script finished both clients had received 999,983 messages. Repeated tests showed, that a different number of messages gets dropped, but always in the tens.

To figure out what was going on, I added the message count to the message. The resulting messages showed, that only the last messages were missing. The consumer showed this message as the last entry:

99979: dev/testtopic b'99979: 2020-10-05T12:00:00.682216'

(the first 99979 is the counter from the consumer, the second one is the counter from the producer)

Trying to get things better I set qos=1. Now reliably after 20 messages the consumers stops receiving messages. The producer exists without error after the number of messages I intended to send.

Am I doing something wrong? Can you tell me, where the messages are lost? Or give me a hint on how to debug this issue? The results are independent of using TLS or leaving it turned off.

If you have questions, please ask them!

Thank you.

For reference: here is (most) of the code I used:

MQTT Producer

import paho.mqtt.client as mqtt
from datetime import datetime

client = mqtt.Client()
client.username_pw_set(user, password)
client.connect(server, port)

print(datetime.utcnow().isoformat())
for i in range(1000000):
    client.publish("dev/testtopic", f'{i + 1}: {datetime.utcnow().isoformat()}', qos=0)
print(datetime.utcnow().isoformat())

client.disconnect()

AMQP Producer

import pika
from datetime import datetime
from urllib.parse import quote


with pika.BlockingConnection(pika.URLParameters(f'amqp://{user}:{password}@{server}:{port}/{vhost}')) as connection:
    print(datetime.utcnow().isoformat())
    channel = connection.channel()
    routing_key = 'dev.testtopic'
    for i in range(1000000):
        channel.basic_publish(
            exchange='amq.topic', routing_key=routing_key, body=f'{i}: {datetime.utcnow().isoformat()}')
    print(datetime.utcnow().isoformat())

MQTT Consumer

import paho.mqtt.client as mqtt


def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))

    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.subscribe("$SYS/#")
    client.subscribe("dev/testtopic")


def on_message(client, userdata, msg):
    global count
    count += 1
    print(f'{count}: {msg.topic} {str(msg.payload)}')


count = 0

client = mqtt.Client()
client.username_pw_set(user, password)
client.on_connect = on_connect
client.on_message = on_message

client.connect(server, host)
client.loop_forever()

AMQP Consumer

import pika

def callback(ch, method, properties, body):
    global count
    count += 1
    print(f'{count}: {method.routing_key} {body}')


with pika.BlockingConnection(pika.URLParameters(f'amqp://{user}:{password}@{server}:{port}/{vhost}')) as connection:
    channel = connection.channel()

    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue

    channel.queue_bind(exchange='amq.topic', queue=queue_name, routing_key='dev.testtopic')
    print(' [*] Waiting for messages. To exit press CTRL+C')
    count = 0


    channel.basic_consume(
        queue=queue_name, on_message_callback=callback, auto_ack=True)

    channel.start_consuming()
1
Have you tried adding an 'on_disconnect()' to your MQTT Consumer to see if it is disconnecting at all? If the issue happens only at the end of the run, then try commenting out the 'client.disconnect()' call on the MQTT Producer and see if that changes anything....could be a timing issue.JD Allen
Many thanks. The consumer does not disconnect. Turns out, it is a timing and/or flushing issue; with some modifications to the code I can reliably send >5,000,000 messages in a rush. I will figure out, if this is caused by paho or the MQTT plugin on the server and create a ticket accordingly.jesterchen

1 Answers

0
votes

Some suggestions:

  • Your AMQP publisher should be using publisher confirmations. Without them, you can lose messages - https://www.rabbitmq.com/confirms.html#publisher-confirms
  • Your MQTT client is exiting before all messages are published. This is not a RabbitMQ bug. You need to register for the on_publish callback and ensure that publishing has completed before your program exits. A hacky way to do this would be to wait a while after your last publish (30 seconds?) and then exit.

NOTE: the RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.