I am currently trying to use the Azure IoT Hub to read messages from the Event Hub compatible IoT Hub default message/events endpoint. To try this out i wrote two commandline apps, one which simulates a device and writes to the IoT Hub and another one which reads from the IoT Hub messages/events endpoint.
The producer produces every second a message and write it to the IoT Hub. This seems to work properly. But when i start the reader/consumer, then it gets a batch of messages and closes the app. But in the meantime the producer produces still messages.
My expectation would be that the producer produces messages for example every second or ramdomly and the consumer "listens" to the endpoint and if a new message arrives reads and displays it. Following my code with producer and consumer for Azure IoT Hub.
Producer /Simulated IoT Device
using Microsoft.Azure.Devices.Client;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace IoTGarage_Azure_01_Simulated_Device
{
class Program
{
private static DeviceClient s_deviceClient;
private readonly static string s_myDeviceId = "simulatedDevice";
private readonly static string s_iotHubUri = "<name.azure-devices.net>";
// Im IoT Hub > Geräte > Primärschlüssel
private readonly static string s_deviceKey = "<primary key>";
private static async Task Main()
{
Console.WriteLine("Routing Tutorial: Simulated device\n");
s_deviceClient = DeviceClient.Create(s_iotHubUri,
new DeviceAuthenticationWithRegistrySymmetricKey(s_myDeviceId, s_deviceKey), TransportType.Mqtt);
using var cts = new CancellationTokenSource();
var messages = SendDeviceToCloudMessagesAsync(cts.Token);
Console.WriteLine("Press the Enter key to stop.");
Console.ReadLine();
cts.Cancel();
await messages;
}
private static async Task SendDeviceToCloudMessagesAsync(CancellationToken token)
{
double minTemperature = 20;
double minHumidity = 60;
Random rand = new Random();
while (!token.IsCancellationRequested)
{
double currentTemperature = minTemperature + rand.NextDouble() * 15;
double currentHumidity = minHumidity + rand.NextDouble() * 20;
string infoString;
string levelValue;
if (rand.NextDouble() > 0.7)
{
if (rand.NextDouble() > 0.5)
{
levelValue = "critical";
infoString = "This is a critical message.";
}
else
{
levelValue = "storage";
infoString = "This is a storage message.";
}
}
else
{
levelValue = "normal";
infoString = "This is a normal message.";
}
var telemetryDataPoint = new
{
deviceId = s_myDeviceId,
temperature = currentTemperature,
humidity = currentHumidity,
pointInfo = infoString
};
var telemetryDataString = JsonConvert.SerializeObject(telemetryDataPoint);
// You can encode this as ASCII, but if you want it to be the body of the message,
// and to be able to search the body, it must be encoded in UTF with base64 encoding.
using var message = new Message(Encoding.UTF32.GetBytes(telemetryDataString));
//Add one property to the message.
message.Properties.Add("target", levelValue);
// Submit the message to the hub.
await s_deviceClient.SendEventAsync(message);
// Print out the message.
Console.WriteLine("{0} > Sent message: {1}", DateTime.Now, telemetryDataString);
await Task.Delay(1000);
}
}
}
}
Consumer
using System;
using System.Text;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;
namespace IoTGarage_Azure_02_IoTHub_ReadFromInternalEndpoint
{
class Program
{
private const string ehubNamespaceConnectionString = "<Endpoint=sb://>";
private const string eventHubName = "<iothubname>";
private const string blobStorageConnectionString = "<DefaultEndpointsProtocol=https;AccountName=EndpointSuffix=core.windows.net>";
private const string blobContainerName = "checkpointblob";
static async Task Main()
{
string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
await processor.StartProcessingAsync();
await Task.Delay(TimeSpan.FromSeconds(30));
await processor.StopProcessingAsync();
}
static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
{
Console.WriteLine("\tReceived event: {0}", Encoding.UTF32.GetString(eventArgs.Data.Body.ToArray()));
await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
}
static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
{
Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
Console.WriteLine(eventArgs.Exception.Message);
return Task.CompletedTask;
}
}
}