1
votes

We're sending data from IoT devices to Azure IoT Hub and trying to pass a certain type of message to an Azure Function.

For now, we did it by creating an Azure Service Bus endpoint and creating a Message routing in IoTHub. It works as expected, with the message being received correctly by the Azure Function.

Now, we'd like to get the DeviceId from IoT Hub in the Azure function, as well as Tags defined in the Device Twin, and I'm completely lost in how to do it.

If we were to use an EventHubTrigger, it seems that it would be straightforward, doing something like this:

public static class Test
{
    [FunctionName("TestQueueTrigger")]
    public static void Run(
        [EventHubTrigger("messages/events", Connection = "IoTHubConnection")]
        EventData message,
        Twin deviceTwin,
        TraceWriter log)
    { ... }
}

But it's really not clear how this could be done with the Service Bus trigger.

Also, we would like to store all messages (independently from the route) to Azure Data Lake storage, and I'm a bit lost on how that would work.

1
Would you like to trigger function on the Device Twin Change Events via ServiceBus? All Azure IoT Hub telemetry messages can be stored to ADL storage independently (no routes, no custom endpoint) from the azure stream analytics job.Roman Kiss
in the case of passing some additional values from the device (for instance, type of message), you can use a message properties collection (that's for user), the other properties collection such as a system properties is holding info such as deviceId, etc..Roman Kiss

1 Answers

1
votes

The Azure IoT Hub device-to-cloud message format is described here. There is no device twin properties in this format. The device twins are stored in the cloud backend, their changes can be notified based on the iot hub routes to the specific endpoint (built-in and/or custom endpoint).

Your example of the function "TestQueueTrigger" is using an azure-functions-iothub-extension for version 1. The extension input binding Twin allows to get the device twin using a separate call within the extension:

deviceTwin = await registryManager.GetTwinAsync(attribute.DeviceId);

Basically, this extension can be used also for ServiceBusTrigger binding. Note, that this extension can be used only for function version 1, so I do recommend to get the device twin within the function using for example a REST API Get Twin call.

Update

The following code snippet shows an example of the ServiceBusTrigger function and the REST API Get Twin call.

run.csx file:

#r "..\\bin\\Microsoft.Azure.ServiceBus.dll"
#r "..\\bin\\Microsoft.Azure.Devices.Shared.dll"
#r "Microsoft.Azure.WebJobs.ServiceBus"
#r "Newtonsoft.Json"


using System;
using System.Threading.Tasks;
using System.Text;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.Azure.WebJobs.ServiceBus;
using Microsoft.Azure.ServiceBus;
using System.Globalization;
using System.Linq;
using System.Net.Http;
using System.Security.Cryptography;
using System.Web;
using Microsoft.Azure.Devices.Shared;

// reusable proxy
static HttpClientHelper iothub = new HttpClientHelper(Environment.GetEnvironmentVariable("AzureIoTHubShariedAccessPolicy"));

public static async Task Run(Message queueItem, ILogger log)
{
    // payload
    log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.UTF8.GetString(queueItem.Body)}");

    // device identity Id
    var deviceId = queueItem.UserProperties["iothub-connection-device-id"];

    // get the device twin
    var response = await iothub.Client.GetAsync($"/twins/{deviceId}?api-version=2018-06-30");
    response.EnsureSuccessStatusCode();
    Twin twin = await response.Content.ReadAsAsync<Twin>();

    log.LogInformation(JsonConvert.SerializeObject(twin.Tags, Formatting.Indented));

    await Task.CompletedTask;
}


// helpers
class HttpClientHelper
{
    HttpClient client;
    DateTime expiringSaS;
    (string hostname, string keyname, string key) config;

    public HttpClientHelper(string connectionString)
    {
        config = GetPartsFromConnectionString(connectionString);
        client = new HttpClient() { BaseAddress = new Uri($"https://{config.hostname}")};
        SetAuthorizationHeader();         
    }

    public HttpClient Client
    {
        get
        {          
            if (expiringSaS < DateTime.UtcNow.AddMinutes(-1))
            {
               SetAuthorizationHeader();  
            }         
            return client;
        }
    }

    internal void SetAuthorizationHeader()
    {
        lock (client)
        {
            if (expiringSaS < DateTime.UtcNow.AddMinutes(-1)) 
            {
                string sasToken = GetSASToken(config.hostname, config.key, config.keyname, 1);
                if (client.DefaultRequestHeaders.Contains("Authorization"))
                    client.DefaultRequestHeaders.Remove("Authorization");
                client.DefaultRequestHeaders.Add("Authorization", sasToken);
                expiringSaS = DateTime.UtcNow.AddHours(1);
            }
        }
    }

    internal (string hostname, string keyname, string key) GetPartsFromConnectionString(string connectionString)
    {
        var parts = connectionString.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries).Select(s => s.Split(new[] { '=' }, 2)).ToDictionary(x => x[0].Trim(), x => x[1].Trim());
        return (parts["HostName"] ?? "", parts["SharedAccessKeyName"] ?? "", parts["SharedAccessKey"] ?? "");
    }

    internal string GetSASToken(string resourceUri, string key, string keyName = null, uint hours = 24)
    {
        var expiry = GetExpiry(hours);
        string stringToSign = HttpUtility.UrlEncode(resourceUri) + "\n" + expiry;
        HMACSHA256 hmac = new HMACSHA256(Convert.FromBase64String(key));

        var signature = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(stringToSign)));
        var sasToken = String.Format(CultureInfo.InvariantCulture, $"SharedAccessSignature sr={HttpUtility.UrlEncode(resourceUri)}&sig={HttpUtility.UrlEncode(signature)}&se={expiry}");
        if (!string.IsNullOrEmpty(keyName))
            sasToken += $"&skn={keyName}";
        return sasToken;
    }

    internal string GetExpiry(uint hours = 24)
    {
        TimeSpan sinceEpoch = DateTime.UtcNow - new DateTime(1970, 1, 1);
        return Convert.ToString((int)sinceEpoch.TotalSeconds + 3600 * hours);
    }
}

function.json:

{
  "bindings": [
    {
      "name": "queueItem",
      "type": "serviceBusTrigger",
      "direction": "in",
      "queueName": "myQueue",
      "connection": "myConnectionString_SERVICEBUS"
    }
  ]
}