3
votes

I'm begininng to use Mqtt and I have a hard time with handling an unreliable network. I'm using a Paho Java Client (in groovy) to publish messages to a distant Mosquitto Broker.

Is there a way, when the broker is unreachable, to have the Paho client persist the message and automatically re-connect to the broker and publish the locally stored messages ? Do I have to handle everything myself, using for example a local broker ?

Here is my client building code

    String persistenceDir = config['persistence-dir'] ?: System.getProperty('java.io.tmpdir')
    def persistence = new MqttDefaultFilePersistence(persistenceDir)
    client = new MqttAsyncClient(uri, clientId, persistence)
    client.setCallback(this)
    options = new MqttConnectOptions()
    if (config.password) {
        options.setPassword(config.password as char[])
        options.setUserName(config.user)
    }
    options.setCleanSession(false)
    client.connect(options)

And my publish code

  def message = new MqttMessage(Json.encode(outgoingMessage).getBytes())
    try {
    client?.connect(options)
    def topic = client.getTopic('processMsg')
    message.setQos(1)
    def token = topic.publish(message)
    if (client) {
        client.disconnect()
    }

Thanks

3

3 Answers

4
votes

The Paho client will only persist in-flight messages when it is connected to the broker.

Typically, when connectivity issues start to arrive you'll see message timeouts popping up

  • Timed out waiting for a response from the server (32000)

At that point the message will still be persisted.

However, when the connection is lost, and you start seeing this

  • Client is not connected (32104)

You should assume that the message has not been persisted by Paho.

You can debug this in org.eclipse.paho.client.mqttv3.internal.ClientComms :

/**
 * Sends a message to the broker if in connected state, but only waits for the message to be
 * stored, before returning.
 */
public void sendNoWait(MqttWireMessage message, MqttToken token) throws MqttException {
    final String methodName = "sendNoWait";
    if (isConnected() ||
            (!isConnected() && message instanceof MqttConnect) ||
            (isDisconnecting() && message instanceof MqttDisconnect)) {
        this.internalSend(message, token);
    } else {
        //@TRACE 208=failed: not connected
        log.fine(className, methodName, "208");
        throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
    }
}

The internalSend will persist the message, but only if it is connected to the broker.

Also take into account that there is a maximum number of inflight messages that Paho can process. If it exceeds that it will also decide to not persist the message.

1
votes

You could just setup a local broker and bridge that with the remote broker. That way you can queue up all your messages locally and when the remote broker comes back online all can be delivered.

0
votes

Yes... After you get an exception that the message can't be delivered, it has to be either persisted or the message needs to be regenerated.

If you plan to use a local broker you can look at Really Small Message Broker (https://www.ibm.com/developerworks/community/groups/service/html/communityview?communityUuid=d5bedadd-e46f-4c97-af89-22d65ffee070)