0
votes

I am currently trying to use the Azure IoT Hub to read messages from the Event Hub compatible IoT Hub default message/events endpoint. To try this out i wrote two commandline apps, one which simulates a device and writes to the IoT Hub and another one which reads from the IoT Hub messages/events endpoint.

The producer produces every second a message and write it to the IoT Hub. This seems to work properly. But when i start the reader/consumer, then it gets a batch of messages and closes the app. But in the meantime the producer produces still messages.

My expectation would be that the producer produces messages for example every second or ramdomly and the consumer "listens" to the endpoint and if a new message arrives reads and displays it. Following my code with producer and consumer for Azure IoT Hub.

Producer /Simulated IoT Device

using Microsoft.Azure.Devices.Client;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace IoTGarage_Azure_01_Simulated_Device
{
    class Program
    {

        private static DeviceClient s_deviceClient;
        private readonly static string s_myDeviceId = "simulatedDevice";
        private readonly static string s_iotHubUri = "<name.azure-devices.net>";
        // Im IoT Hub > Geräte > Primärschlüssel
        private readonly static string s_deviceKey = "<primary key>";

        private static async Task Main()
        {
            Console.WriteLine("Routing Tutorial: Simulated device\n");
            s_deviceClient = DeviceClient.Create(s_iotHubUri,
                new DeviceAuthenticationWithRegistrySymmetricKey(s_myDeviceId, s_deviceKey), TransportType.Mqtt);

            using var cts = new CancellationTokenSource();
            var messages = SendDeviceToCloudMessagesAsync(cts.Token);
            Console.WriteLine("Press the Enter key to stop.");
            Console.ReadLine();
            cts.Cancel();
            await messages;
        }

        private static async Task SendDeviceToCloudMessagesAsync(CancellationToken token)
        {
            double minTemperature = 20;
            double minHumidity = 60;
            Random rand = new Random();

            while (!token.IsCancellationRequested)
            {
                double currentTemperature = minTemperature + rand.NextDouble() * 15;
                double currentHumidity = minHumidity + rand.NextDouble() * 20;

                string infoString;
                string levelValue;

                if (rand.NextDouble() > 0.7)
                {
                    if (rand.NextDouble() > 0.5)
                    {
                        levelValue = "critical";
                        infoString = "This is a critical message.";
                    }
                    else
                    {
                        levelValue = "storage";
                        infoString = "This is a storage message.";
                    }
                }
                else
                {
                    levelValue = "normal";
                    infoString = "This is a normal message.";
                }

                var telemetryDataPoint = new
                {
                    deviceId = s_myDeviceId,
                    temperature = currentTemperature,
                    humidity = currentHumidity,
                    pointInfo = infoString
                };

                var telemetryDataString = JsonConvert.SerializeObject(telemetryDataPoint);
                
                // You can encode this as ASCII, but if you want it to be the body of the message, 
                //  and to be able to search the body, it must be encoded in UTF with base64 encoding.
                using var message = new Message(Encoding.UTF32.GetBytes(telemetryDataString));
               
                //Add one property to the message.
                message.Properties.Add("target", levelValue);

                // Submit the message to the hub.
                await s_deviceClient.SendEventAsync(message);

                // Print out the message.
                Console.WriteLine("{0} > Sent message: {1}", DateTime.Now, telemetryDataString);

                await Task.Delay(1000);
            }
        }
    }
}

Consumer

using System;
using System.Text;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;

namespace IoTGarage_Azure_02_IoTHub_ReadFromInternalEndpoint
{
    class Program
    {
        private const string ehubNamespaceConnectionString = "<Endpoint=sb://>";
        private const string eventHubName = "<iothubname>";
        private const string blobStorageConnectionString = "<DefaultEndpointsProtocol=https;AccountName=EndpointSuffix=core.windows.net>";
        private const string blobContainerName = "checkpointblob";

        static async Task Main()
        {
            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

            BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
            EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);

            processor.ProcessEventAsync += ProcessEventHandler;
            processor.ProcessErrorAsync += ProcessErrorHandler;

            await processor.StartProcessingAsync();

            await Task.Delay(TimeSpan.FromSeconds(30));
            await processor.StopProcessingAsync();
        }

        static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
        {
            Console.WriteLine("\tReceived event: {0}", Encoding.UTF32.GetString(eventArgs.Data.Body.ToArray()));
            await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }

        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
        {
            Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
            Console.WriteLine(eventArgs.Exception.Message);
            return Task.CompletedTask;
        }
    }
}
1

1 Answers

1
votes

What you are seeing is likely due to message retention policy of IoTHub with respect to its built-in EventHub. That would explain the initial flurry of messages you see coming through your receiver. The fact that your app exits is likely because you are letting the main thread exit. Set EventPosition=Latest to only read the new messages coming in.

Option #1 - using EventHubConsumerClient

using Azure.Messaging.EventHubs.Consumer;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace DotnetService
{
    class Program
    {
        private const string EventHubsCompatibleEndpoint = "TODO: az iot hub show --query properties.eventHubEndpoints.events.endpoint --name {hubname}";  
        private const string EventHubsCompatiblePath = "TODO: {hubname}";  
        private const string IotHubSasKey = "TODO: az iot hub policy show --name service --query primaryKey --hub-name {hubname}";  
        private const string ConsumerGroup = "$Default";

        private static EventHubConsumerClient eventHubConsumerClient = null;
        private async static Task Setup()
        {
            string eventHubConnectionString = $"Endpoint={EventHubsCompatibleEndpoint.Replace("sb://", "amqps://")};EntityPath={EventHubsCompatiblePath};SharedAccessKeyName=service;SharedAccessKey={IotHubSasKey};";
            eventHubConsumerClient = new EventHubConsumerClient(ConsumerGroup, eventHubConnectionString);

            var tasks = new List<Task>();
            var partitions = await eventHubConsumerClient.GetPartitionIdsAsync();
            foreach (string partition in partitions)
            {
                tasks.Add(ReceiveMessagesFromDeviceAsync(partition));
            }
        }

        static async Task ReceiveMessagesFromDeviceAsync(string partitionId)
        {
            Console.WriteLine($"Starting listener thread for partition: {partitionId}");
            while (true)
            {
                await foreach (PartitionEvent receivedEvent in eventHubConsumerClient.ReadEventsFromPartitionAsync(partitionId, EventPosition.Latest))
                {
                    string msgSource;
                    string body = Encoding.UTF8.GetString(receivedEvent.Data.Body.ToArray());
                    if (receivedEvent.Data.SystemProperties.ContainsKey("iothub-message-source"))
                    {
                        msgSource = receivedEvent.Data.SystemProperties["iothub-message-source"].ToString();
                        Console.WriteLine($"{partitionId} {msgSource} {body}");
                    }
                }
            }
        }

        static async Task Main(string[] args)
        {
            await Setup();
            Console.ReadLine();
        }
    }
}

Option #2 using EventProcessorClient

using System;  
using System.Text;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;

namespace DotnetService
{
    class Program
    {
        private const string ehubNamespaceConnectionString = "Endpoint=sb://...";
        private const string eventHubName = "{iothubname}";
        private const string blobStorageConnectionString = "DefaultEndpointsProtocol=...";
        private const string blobContainerName = "{storagename}";

        private static Task initializeEventHandler(PartitionInitializingEventArgs arg)
        {
            arg.DefaultStartingPosition = EventPosition.Latest;
            return Task.CompletedTask;
        }

        static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
        {
            Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
            await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }

        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
        {
            Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered.");
            Console.WriteLine(eventArgs.Exception.Message);
            return Task.CompletedTask;
        }

        static async Task Main()
        {
            BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
            EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, ehubNamespaceConnectionString, eventHubName);

            processor.PartitionInitializingAsync += initializeEventHandler;
            processor.ProcessEventAsync += ProcessEventHandler;
            processor.ProcessErrorAsync += ProcessErrorHandler;

            await processor.StartProcessingAsync();
            Console.ReadLine();
        }
    }
}