I am seeing inconsistent message delivery with message persistence and qos=2 on mosquitto. Is there anything I'm doing wrong?
I have a simple test app that registers a topic for consumption with clientId="receive-client", but immediately disconnects. It then connects as clientId="send-client" and publishes 10 messages, "message #1" ... "message #10". Then disconnects, waits five seconds, and connects to consume with "receive-client" again while printing and counting the messages received.
The result is inconsistent. Sometimes I receive 6 messages, sometimes 8. Typical output is something like this:
WARN[0005] GOT A MESSAGE:message #1
WARN[0005] GOT A MESSAGE:message #2
WARN[0005] GOT A MESSAGE:message #3
WARN[0005] GOT A MESSAGE:message #4
WARN[0005] GOT A MESSAGE:message #5
WARN[0005] GOT A MESSAGE:message #6
WARN[0005] GOT A MESSAGE:message #7
WARN[0005] GOT A MESSAGE:message #8
WARN[0305] PAUSE
WARN[0605] received message count=8
My version information says 1.4.15. My mosquitto.conf is:
pid_file /var/run/mosquitto.pid
persistence true
persistence_location /var/lib/mosquitto/
allow_anonymous false
password_file /etc/mosquitto/passwd
log_dest file /var/log/mosquitto/mosquitto.log
Initially /var/lib/mosquitto/mosquitto.db doesn't show up until several iterations have been run. My test app is here:
import (
mqtt "github.com/eclipse/paho.mqtt.golang"
log "github.com/sirupsen/logrus"
"time"
)
var receivedMsg int
func Persist() {
const TOPIC = "test"
const URL = "tcp://localhost:1883"
const USERNAME = "myuser"
const PASSWORD = "mypassword"
defer printReceived()
options := mqtt.NewClientOptions().AddBroker(URL).SetUsername(USERNAME).SetPassword(PASSWORD)
options.SetCleanSession(false)
options.SetConnectRetry(true)
options.SetConnectRetryInterval(10 * time.Millisecond)
// register the receive client with broker / TOPIC
// to be sure the broker knows it needs to save our messages
// to deliver at a later time
options.SetClientID("receive-client")
client := mqtt.NewClient(options)
token := client.Connect()
token.Wait()
if token := client.Subscribe(TOPIC, 2, consume1); token.Wait() && token.Error() != nil {
panic(token.Error())
}
client.Disconnect(0)
// connect with send client and send 10 messages
options.SetClientID("send-client")
client = mqtt.NewClient(options)
token = client.Connect()
token.Wait()
client.Publish(TOPIC, 2, false, "message #1")
client.Publish(TOPIC, 2, false, "message #2")
client.Publish(TOPIC, 2, false, "message #3")
client.Publish(TOPIC, 2, false, "message #4")
client.Publish(TOPIC, 2, false, "message #5")
client.Publish(TOPIC, 2, false, "message #6")
client.Publish(TOPIC, 2, false, "message #7")
client.Publish(TOPIC, 2, false, "message #8")
client.Publish(TOPIC, 2, false, "message #9")
client.Publish(TOPIC, 2, false, "message #10")
client.Disconnect(4)
time.Sleep(5* time.Second)
// subscribe again and try to retrieve the messages we missed
options.SetClientID("receive-client")
client = mqtt.NewClient(options)
token = client.Connect()
token.Wait()
if token := client.Subscribe(TOPIC, 2, consume2); token.Wait() && token.Error() != nil {
panic(token.Error())
}
time.Sleep(300 * time.Second)
log.Warn("PAUSE")
time.Sleep(300 * time.Second)
}
func consume1(client mqtt.Client, msg mqtt.Message) {
receivedMsg++
log.Warn("THIS SHOULD NOT BE CONSUMING ANY MESSAGES:", string(msg.Payload()))
}
func consume2(client mqtt.Client, msg mqtt.Message) {
receivedMsg++
log.Warn("GOT A MESSAGE:", string(msg.Payload()))
}
func printReceived() {
log.Warn("received message count=", receivedMsg)
}