0
votes

I am trying to send messages to Azure IOT hub using MQTT and using Azure Device SDK libraries. There are two devices which are configured on IOT Hub. Both devices have different connection string.

Connection string for device 1

connString = "HostName=ABC.azure-devices.net;DeviceId=ABC;SharedAccessKey=sharedKey";

Connection string for device 2

connString = "HostName=DEF.azure-devices.net;DeviceId=DEF;SharedAccessKey=sharedKey";

I have wrote two publishers that will send the messages to IOT hub and the subscriber that will recieve message from IOT Hub sent by the publishers.

In first publisher I have passed connection string for device 1 and for second publisher the connection string for device 2. Now when I am running both the publisher classes simultaneously, subscriber is recieving messages from both the publishers. Can anyone let me know how can I modify the code so that the subscriber will only only recieve messages sent by publisher 1 even when both the publisher are running simultaneously.

Here is my code for Publisher 1.

package com.iot.mqtt.connection;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.google.gson.Gson;
import com.microsoft.azure.sdk.iot.device.DeviceClient;
import com.microsoft.azure.sdk.iot.device.IotHubClientProtocol;
import com.microsoft.azure.sdk.iot.device.IotHubEventCallback;
import com.microsoft.azure.sdk.iot.device.IotHubStatusCode;
import com.microsoft.azure.sdk.iot.device.Message;
//import com.microsoft.docs.iothub.samples.SimulatedDevice.EventCallback;
//import com.microsoft.docs.iothub.samples.SimulatedDevice.MessageSender;
//import com.microsoft.docs.iothub.samples.SimulatedDevice.TelemetryDataPoint;

public class SimulatedDevice {
      // The device connection string to authenticate the device with your IoT hub.
      // Using the Azure CLI:
      // az iot hub device-identity show-connection-string --hub-name {YourIoTHubName} --device-id MyJavaDevice --output table


      private static String connString = "HostName=ABC.azure-devices.net;DeviceId=ABC;SharedAccessKey=aTVNu55sN9a2Y9+V0BCAOXdo8nSFDNzByfiTqMvNb20=";

      // Using the MQTT protocol to connect to IoT Hub
      private static IotHubClientProtocol protocol = IotHubClientProtocol.MQTT;
      private static DeviceClient client;

      // Specify the telemetry to send to your IoT hub.
      private static class TelemetryDataPoint {
        public double temperature;
        public double humidity;
        public String message;
        public String timeStamp;

        // Serialize object to JSON format.
        public String serialize() {
          Gson gson = new Gson();
          return gson.toJson(this);
        }
      }

      // Print the acknowledgement received from IoT Hub for the telemetry message sent.
      private static class EventCallback implements IotHubEventCallback {
        public void execute(IotHubStatusCode status, Object context) {
          System.out.println("IoT Hub responded to message with status: " + status.name());

          if (context != null) {
            synchronized (context) {
              context.notify();
            }
          }
        }
      }

      private static class MessageSender implements Runnable {
        public void run() {
          try {
            // Initialize the simulated telemetry.
            double minTemperature = 20;
            double minHumidity = 60;
            String message;
            Random rand = new Random();

            InputStream is = null;
            Properties prop = null;

            prop = new Properties();
            is = new FileInputStream("C:\\Users\\H251970\\eclipse-workspace\\IOTMqttTestProject\\resources\\config.properties");
            prop.load(is);
            message = prop.getProperty("message");
            minTemperature = Double.parseDouble(prop.getProperty("temperature"));
            minHumidity = Double.parseDouble(prop.getProperty("humidity"));
            //System.out.println(message);

            while (true) {
              // Simulate telemetry.
              double currentTemperature = minTemperature + rand.nextDouble() * 15;
              double currentHumidity = minHumidity + rand.nextDouble() * 20;
              String datatimeStamp= new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(Calendar.getInstance().getTime());;
              TelemetryDataPoint telemetryDataPoint = new TelemetryDataPoint();
              telemetryDataPoint.temperature = currentTemperature;
              telemetryDataPoint.humidity = currentHumidity;
              telemetryDataPoint.message = message;
              telemetryDataPoint.timeStamp = datatimeStamp;
              // Add the telemetry to the message body as JSON.
              String msgStr = telemetryDataPoint.serialize();

              Message msg = new Message(msgStr);

              // Add a custom application property to the message.
              // An IoT hub can filter on these properties without access to the message body.
              msg.setProperty("temperatureAlert", (currentTemperature > 30) ? "true" : "false");

              System.out.println("Sending message: " + msgStr);

              Object lockobj = new Object();

              // Send the message.
              EventCallback callback = new EventCallback();
              client.sendEventAsync(msg, callback, lockobj);

              synchronized (lockobj) {
                lockobj.wait();
              }
              Thread.sleep(3000);
            }
          } catch (Exception e) {
                System.out.println("Exception: " + e);
            } /*finally {
                inputStream.close();
            }*/  
        }
      }

      public static void main(String[] args) throws IOException, URISyntaxException {

        // Connect to the IoT hub.
        client = new DeviceClient(connString, protocol);
        client.open();

        // Create new thread and start sending messages 
        MessageSender sender = new MessageSender();
        ExecutorService executor = Executors.newFixedThreadPool(1);
        executor.execute(sender);

        // Stop the application.
        System.out.println("Press ENTER to exit.");
        System.in.read();
        executor.shutdownNow();
        client.closeNow();
      }
    }

Code for publisher 2 is also same just the connection-string is different.

connString = "HostName=DEF.azure-devices.net;DeviceId=DEF;SharedAccessKey=aTVNu55sN9a2Y9+V0BCAOXdo8nSFDNzByfiTqMvNb20=";

The subscriber is recieving the message in following form.

Sending message: {"temperature":"27.739594911863872°C","voltage":"15.81301816513805V","motorspeed":"5.0m/s","inverterName":"i550","timeStamp":"22/08/2018 11:18:03"}
IoT Hub responded to message with status: OK_EMPTY

Here is the code for Subscriber class

package com.microsoft.docs.iothub.samples;

import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.EventPosition;
import com.microsoft.azure.eventhubs.EventHubRuntimeInformation;
import com.microsoft.azure.eventhubs.PartitionReceiver;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.nio.charset.Charset;
import java.net.URI;
import java.net.URISyntaxException;

public class ReadDeviceToCloudMessages {

  // az iot hub show --query properties.eventHubEndpoints.events.endpoint --name {your IoT Hub name}
  //private static final String eventHubsCompatibleEndpoint = "{your Event Hubs compatible endpoint}";
  private static final String eventHubsCompatibleEndpoint = "sb://ihsuprodmares009dednamespace.servicebus.windows.net/";

  // az iot hub show --query properties.eventHubEndpoints.events.path --name {your IoT Hub name}
  //private static final String eventHubsCompatiblePath = "{your Event Hubs compatible name}";
  private static final String eventHubsCompatiblePath = "eventHubsCompatiblePathString";

  // az iot hub policy show --name iothubowner --query primaryKey --hub-name {your IoT Hub name}
  private static final String iotHubSasKey = "iotHubSasKeyString=";
  private static final String iotHubSasKeyName = "iothubowner";

  // Track all the PartitionReciever instances created.
  private static ArrayList<PartitionReceiver> receivers = new ArrayList<PartitionReceiver>();

  // Asynchronously create a PartitionReceiver for a partition and then start 
  // reading any messages sent from the simulated client.
  private static void receiveMessages(EventHubClient ehClient, String partitionId)
      throws EventHubException, ExecutionException, InterruptedException {

    final ExecutorService executorService = Executors.newSingleThreadExecutor();

    // Create the receiver using the default consumer group.
    // For the purposes of this sample, read only messages sent since 
    // the time the receiver is created. Typically, you don't want to skip any messages.
    ehClient.createReceiver(EventHubClient.DEFAULT_CONSUMER_GROUP_NAME, partitionId,
        EventPosition.fromEnqueuedTime(Instant.now())).thenAcceptAsync(receiver -> {
          System.out.println(String.format("Starting receive loop on partition: %s", partitionId));
          System.out.println(String.format("Reading messages sent since: %s", Instant.now().toString()));

          receivers.add(receiver);

          while (true) {
            try {
              // Check for EventData - this methods times out if there is nothing to retrieve.
              Iterable<EventData> receivedEvents = receiver.receiveSync(100);

              // If there is data in the batch, process it.
              if (receivedEvents != null) {
                for (EventData receivedEvent : receivedEvents) {
                  System.out.println(String.format("Telemetry received:\n %s",
                      new String(receivedEvent.getBytes(), Charset.defaultCharset())));
                  System.out.println(String.format("Application properties (set by device):\n%s",receivedEvent.getProperties().toString()));
                  System.out.println(String.format("System properties (set by IoT Hub):\n%s\n",receivedEvent.getSystemProperties().toString()));
                }
              }
            } catch (EventHubException e) {
              System.out.println("Error reading EventData");
            }
          }
        }, executorService);
  }

  public static void main(String[] args)
      throws EventHubException, ExecutionException, InterruptedException, IOException, URISyntaxException {

    final ConnectionStringBuilder connStr = new ConnectionStringBuilder()
        .setEndpoint(new URI(eventHubsCompatibleEndpoint))
        .setEventHubName(eventHubsCompatiblePath)
        .setSasKeyName(iotHubSasKeyName)
        .setSasKey(iotHubSasKey);

    // Create an EventHubClient instance to connect to the
    // IoT Hub Event Hubs-compatible endpoint.
    final ExecutorService executorService = Executors.newSingleThreadExecutor();
    final EventHubClient ehClient = EventHubClient.createSync(connStr.toString(), executorService);

    // Use the EventHubRunTimeInformation to find out how many partitions 
    // there are on the hub.
    final EventHubRuntimeInformation eventHubInfo = ehClient.getRuntimeInformation().get();

    // Create a PartitionReciever for each partition on the hub.
    for (String partitionId : eventHubInfo.getPartitionIds()) {
      receiveMessages(ehClient, partitionId);
    }

    // Shut down cleanly.
    System.out.println("Press ENTER to exit.");
    System.in.read();
    System.out.println("Shutting down...");
    for (PartitionReceiver receiver : receivers) {
      receiver.closeSync();
    }
    ehClient.closeSync();
    executorService.shutdown();
    System.exit(0);
  }
}

Now I am trying to figure out how to recieve messages from publisher 1 only even when both publishers are running simultaneously. Thanks in Advance.

1
Hi, Harshad, please don't show your hostname and SharedAccessKey directly. These are private information.Rita Han
Can you show how the subscriber receive the message? Note: Azure IoT Hub is not a generic MQTT broker.Rita Han
Thanks Rita ... Above is my code for Subscriber class.Harshad Holkar
Hi Harshad, please let me know if it helps. Feel free post if you have any concern.Rita Han
Hi Rita... Thanks for the answer.. however I have been trying to figure out a way in which I can publish and subscribe through the topic only. I have found out the link in which we can send messages to IOT hub via topics and subscribe messages through topic. docs.microsoft.com/en-us/azure/iot-hub/… I have not been able to understand that completely since I am quite new in it as well the code is written in python. I am trying to replicate the same in java using paho mqtt library. Please correct me if I am wrong anywhere.Harshad Holkar

1 Answers

0
votes

Azure IoT Hub is not a generic MQTT broker. It only supports the following topics:

devices/{device_id}/messages/devicebound/

devices/{device_id}/messages/devicebound/{property_bag}

And Event Hub can't specify device id too. Like what you have found.

There is a workaround but it is only working when your devices less and equal 10. This because the maximum custom endpoint you can create is 10.

That is you can use Service Bus Topic instead of Event Hub:

Device -> Azure IoT Hub -> Routing -> Endpoint "Service Bus Topic"

Add two service bus topic endpoints.

enter image description here

Add two routes for two endpoints:

enter image description here

Receive from the service bus topic:

subscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, SubscriptionName);