i am little confuse about qos, i read about qos is if qos set to 2 then The broker/client will deliver the message exactly once by using a four step handshake.
so qos 2 confirms that message is published on broker, not received by subscriber(client). or message is received by subscriber or
for acknowledgment we should need to establish application like publisher will publish the message with a topic for example "DATA" and will subscribe on a topic for example "ACK" and subscriber need to publish acknowledgment on topic "ACK" that message is received on topic "DATA"
i created a java class for publishing data and another class for subscribing publisher
in the following code i tried to publish at qos 2 and in deliveryComplete function i got exception when try to getMessage() when i tried with qos 0 getMessage() didn't give any exception.
public class PublishMe implements MqttCallback{
MqttClient myClient;
MqttClient myClientPublish;
MqttConnectOptions connOpt;
MqttConnectOptions connOptPublish;
static final String BROKER_URL = "tcp://Ehydromet-PC:1883";
static Boolean msgACK=false;
public static void main(String[] args) {
PublishMe smc = new PublishMe();
smc.runClient();
}
@Override
public void connectionLost(Throwable t) {
System.out.println("Connection lost!");
}
@Override
public void messageArrived(String string, MqttMessage message) throws Exception {
System.out.println("-------------------------------------------------");
System.out.println("| Topic:" + string);
System.out.println("| Message: " + new String(message.getPayload()));
System.out.println("-------------------------------------------------");
}
/**
*
* deliveryComplete
* This callback is invoked when a message published by this client
* is successfully received by the broker.
*
* @param token
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
try{
System.out.println("Message delivered successfully to topic : \"" + token.getMessage().toString() + "\".");
}catch(Exception ex){
System.out.println(ex.getCause()+" -- "+ex.getLocalizedMessage()+" -- "+ex.getMessage()+" -- " );
}
}
public void runClient() {
connOpt = new MqttConnectOptions();
connOpt.setCleanSession(false);
connOpt.setKeepAliveInterval(0);
connOptPublish= new MqttConnectOptions();
connOptPublish.setCleanSession(false);
connOptPublish.setKeepAliveInterval(0);
// Connect to Broker
try {
myClient = new MqttClient(BROKER_URL, "pahomqttpublish11");
myClient.setCallback(this);
myClient.connect(connOpt);
myClientPublish= new MqttClient(BROKER_URL, "pahomqttpublish42");
myClientPublish.setCallback(this);
myClientPublish.connect(connOptPublish);
} catch (MqttException e) {
e.printStackTrace();
System.exit(-1);
}
System.out.println("Connected to " + BROKER_URL);
String myTopic = "sample";
// String myTopic = "receiveDATA2";
MqttTopic topic = myClientPublish.getTopic(myTopic);
// publish messages if publisher
if (publisher) {
int i=1;
while(true){
String pubMsg = "sample msg "+i;
MqttMessage message = new MqttMessage(pubMsg.getBytes());
System.out.println(message);
message.setQos(2);
message.setRetained(false);
// Publish the message
MqttDeliveryToken token = null;
try {
// publish message to broker
token = topic.publish(message);
// Wait until the message has been delivered to the broker
token.waitForCompletion();
msgACK=false;
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
and below is subscriber
public class Mqttsample implements MqttCallback{
MqttClient myClient;
MqttClient myClientPublish;
MqttConnectOptions connOpt;
MqttConnectOptions connOptPublish;
static final String BROKER_URL = "tcp://Ehydromet-PC:1883";
// the following two flags control whether this example is a publisher, a subscriber or both
static final Boolean subscriber = true;
static final Boolean publisher = true;
public static void main(String[] args) {
Mqttsample smc = new Mqttsample();
smc.runClient();
}
@Override
public void connectionLost(Throwable t) {
System.out.println("Connection lost!");
// code to reconnect to the broker would go here if desired
}
@Override
public void messageArrived(String string, MqttMessage message) throws Exception {
//throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
System.out.println("| Topic:" + string+"| Message: " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
try{
System.out.println("Pub complete" + new String(token.getMessage().getPayload()));
}
catch(Exception ex ){
System.out.println("delivery Error "+ex.getMessage());
}
}
public void runClient() {
connOpt = new MqttConnectOptions();
connOpt.setCleanSession(false);
connOpt.setKeepAliveInterval(0);
connOptPublish= new MqttConnectOptions();
connOptPublish.setCleanSession(false);
connOptPublish.setKeepAliveInterval(0);
// Connect to Broker
try {
myClient = new MqttClient(BROKER_URL, "pahomqttpublish");
myClient.setCallback(this);
myClient.connect(connOpt);
myClientPublish= new MqttClient(BROKER_URL, "pahomqttsubscribe");
myClientPublish.setCallback(this);
myClientPublish.connect(connOptPublish);
} catch (MqttException e) {
e.printStackTrace();
System.exit(-1);
}
System.out.println("Connected to " + BROKER_URL);
// subscribe to topic if subscriber
if (subscriber) {
try {
//String myTopicACK = M2MIO_DOMAIN + "/" + "ACK" + "/" + M2MIO_THING;
String myTopicACK = "sample";
// MqttTopic topicACK = myClient.getTopic(myTopicACK);
int subQoS = 2;
myClient.subscribe(myTopicACK, subQoS);
} catch (Exception e) {
e.printStackTrace();
}
}
//
}
}
how can i assure that subscriber has received the message, what should i need to implement in publisher code.
http://www.eclipse.org/paho/files/mqttdoc/Cclient/qos.html from the above link
QoS2, Exactly once: The message is always delivered exactly once. The message must be stored locally at the sender, until the sender receives confirmation that the message has been published by the receiver. The message is stored in case the message must be sent again. QoS2 is the safest, but slowest mode of transfer.