I am running RabbitMQ 3.7.28 on a linux node, single installation, no more cluster nodes. The MQTT message plugin is enabled, both TLS and non-TLS connections are successful. Using python 3.8 and pika 1.1.0 I have sent 1,000,000 messages to the broker via AMQP.
During sending of the messages I had two consumer connected: one using pika/AMQP, one using paho-mqtt 1.5.1. Both consumers received 1,000,000 messages.
Then I tried sending messages using paho-mqtt, and after this script finished both clients had received 999,983 messages. Repeated tests showed, that a different number of messages gets dropped, but always in the tens.
To figure out what was going on, I added the message count to the message. The resulting messages showed, that only the last messages were missing. The consumer showed this message as the last entry:
99979: dev/testtopic b'99979: 2020-10-05T12:00:00.682216'
(the first 99979 is the counter from the consumer, the second one is the counter from the producer)
Trying to get things better I set qos=1. Now reliably after 20 messages the consumers stops receiving messages. The producer exists without error after the number of messages I intended to send.
Am I doing something wrong? Can you tell me, where the messages are lost? Or give me a hint on how to debug this issue? The results are independent of using TLS or leaving it turned off.
If you have questions, please ask them!
Thank you.
For reference: here is (most) of the code I used:
MQTT Producer
import paho.mqtt.client as mqtt
from datetime import datetime
client = mqtt.Client()
client.username_pw_set(user, password)
client.connect(server, port)
print(datetime.utcnow().isoformat())
for i in range(1000000):
client.publish("dev/testtopic", f'{i + 1}: {datetime.utcnow().isoformat()}', qos=0)
print(datetime.utcnow().isoformat())
client.disconnect()
AMQP Producer
import pika
from datetime import datetime
from urllib.parse import quote
with pika.BlockingConnection(pika.URLParameters(f'amqp://{user}:{password}@{server}:{port}/{vhost}')) as connection:
print(datetime.utcnow().isoformat())
channel = connection.channel()
routing_key = 'dev.testtopic'
for i in range(1000000):
channel.basic_publish(
exchange='amq.topic', routing_key=routing_key, body=f'{i}: {datetime.utcnow().isoformat()}')
print(datetime.utcnow().isoformat())
MQTT Consumer
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("$SYS/#")
client.subscribe("dev/testtopic")
def on_message(client, userdata, msg):
global count
count += 1
print(f'{count}: {msg.topic} {str(msg.payload)}')
count = 0
client = mqtt.Client()
client.username_pw_set(user, password)
client.on_connect = on_connect
client.on_message = on_message
client.connect(server, host)
client.loop_forever()
AMQP Consumer
import pika
def callback(ch, method, properties, body):
global count
count += 1
print(f'{count}: {method.routing_key} {body}')
with pika.BlockingConnection(pika.URLParameters(f'amqp://{user}:{password}@{server}:{port}/{vhost}')) as connection:
channel = connection.channel()
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='amq.topic', queue=queue_name, routing_key='dev.testtopic')
print(' [*] Waiting for messages. To exit press CTRL+C')
count = 0
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()