I have an issue with respect to consuming offline MQTT messages in Moquette server through eclipse Paho client.
Following are the steps which I have followed.
- Created and spinned up the Moquette MQTT broker.
- Created a simple MQTT consumer application using eclipse Paho client.
- Set consumer to consume data on topic : "devices/reported/#" with QOS : 1 and CleanSession : False
- Created a simple MQTT data publisher to publish data to MQTT broker using Eclipse Paho.
- Used MQTT data publisher to publish messages to : "devices/reported/client_1" topic with QOS : 1
Above steps were successful without any issue.
Then I stopped my consumer application and sent MQTT data to broker with the same topic. using my publisher application - Server was able to receive these messages but in this moment there was no any consumer to consume this message since I have stopped my consumer. Then I started my consumer application again. It was connected to the broker successfully but, it did not receive any message which I sent to broker while the consumer shutdown.
Do I need to do any specific configuration to my Moquette server to persist data (with clean session : false) ? Or am I missing something ?
Please find my sample code below,
Moquette Server initialization
package com.gbids.mqtt.moquette.main;
import com.gbids.mqtt.moquette.server.PublishInterceptor;
import io.moquette.interception.InterceptHandler;
import io.moquette.server.Server;
import io.moquette.server.config.IConfig;
import io.moquette.server.config.MemoryConfig;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class ServerLauncher {
public static void main(String[] args) throws IOException {
Properties props = new Properties();
final IConfig configs = new MemoryConfig(props);
final Server mqttBroker = new Server();
final List<? extends InterceptHandler> userHandlers = Arrays.asList(new PublishInterceptor());
mqttBroker.startServer(configs, userHandlers);
System.out.println("moquette mqtt broker started, press ctrl-c to shutdown..");
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.out.println("stopping moquette mqtt broker..");
mqttBroker.stopServer();
System.out.println("moquette mqtt broker stopped");
}
});
}
}
MQTT Consumer
package com.gbids.mqtt.moquette.main;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class ConsumerLauncher implements MqttCallback {
private static final String topicPrefix = "devices/reported";
private static final String broker = "tcp://0.0.0.0:1883";
private static final String clientIdPrefix = "consumer";
public static void main(String[] args) throws MqttException {
final String clientId = "consumer_1";
MqttClient sampleClient = new MqttClient(broker, clientId, new MemoryPersistence());
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(false);
sampleClient.connect(connOpts);
sampleClient.subscribe(topicPrefix + "/#", 1);
sampleClient.setCallback(new ConsumerLauncher());
}
public void connectionLost(Throwable throwable) {
System.out.println("Consumer connection lost : " + throwable.getMessage());
}
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println("Message arrived from topic : " + s + " | Message : " + new String(mqttMessage.getPayload()) + " | Message ID : " +mqttMessage.getId());
}
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("Delivery completed from : " + clientIdPrefix + "_1");
}
}
MQTT Publisher
package com.gbids.mqtt.moquette.main;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class ClientLauncher {
private static final String content = "{\"randomData\": 25}";
private static final String willContent = "Client disconnected unexpectedly";
private static final String broker = "tcp://0.0.0.0:1883";
private static final String clientIdPrefix = "client";
public static void main(String[] args) throws Exception{
sendDataWithQOSOne();
System.exit(0);
}
private static void sendDataWithQOSOne(){
try {
final String clientId = "client_1";
MqttClient sampleClient = new MqttClient(broker, clientId, new MemoryPersistence());
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(false); // for publisher - this is not needed I think
sampleClient.connect(connOpts);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(1);
final String topic = "devices/reported/" + clientId;
sampleClient.publish(topic, message);
System.out.println("Message published from : " + clientId + " with payload of : " + content);
sampleClient.disconnect();
} catch (MqttException me) {
me.printStackTrace();
}
}
}