1
votes

I created an Azure Function based on EventGrid Trigger. That trigger will fire whenever a new message arrives at a service bus topic. Below is the function template generated

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

public static void Run(JObject eventGridEvent, TraceWriter log)
{
    log.Info(eventGridEvent.ToString(Formatting.Indented));
}

My requirement from Azure function is to process the data and store it in ADLS. Now how do I parse/de-serialize data from JObject type. I need to normalize data within this function before persisting it to Data lake store. Do I need to overwrite the function?.

Please provide some details/reference to meet this requirement

2

2 Answers

2
votes

Service Bus (Premium) sends events for two scenarios:

  1. ActiveMessagesWithNoListenersAvailable
  2. DeadletterMessagesAvailable

The first event will be emitted when there are messages associated with a specific entity and no active listeners exist. The entity will be indicated in the payload, along with other required information to access it (such as the namespace, or the topic of the subscription to receive from). The schema is defined in documentation.

The second event schema wise is similar to the first one, and is emitted for dead-lettered letter queues.

Now how do I parse/de-serialize data from JObject type. I need to normalize data within this function before persisting it to Data lake store. Do I need to overwrite the function?.

eventGridEvent JSON itself is not going to give you the Azure Service Bus message (s). You will need to know how the original messages were serialized first, i.e. what the sender side used. That deserialization will need to go into the Function, followed by code to write the object Data Lake.

0
votes

In addition to Sean's answer, the Azure Service Bus integration with an AEG enables to build some watchdog capability for ASB entities. Note, that this integration is not something like is done for the storage blob account, where the events are published each time when the blob is created/deleted. In other words, the ASB will not publish an event for each message arrived in the ASB entity, the events are published like an entity watchdog.

This kind of entity watchdog uses the following logic:

  1. No event is published when there is no message in the entity.
  2. The event is published immediately when the first message is arrived into the entity and there is no active listener for 360+ seconds on the entity
  3. The event is published every 120 seconds when the listener is still non-active and there are at least one message in the entity
  4. The event is published after 360 seconds listener idle (non-active) time and still there are at least one message in the entity. For example, if we have 5 messages in the entity and the subscriber will pull-up only one message using a REST Api, the next event will be published after 360 seconds. In other words, the watchdog entity allows to keep a listener in the idle time for 360 seconds.

Based on the above "watchdog entity" behavior, this feature looks like is more suitable for slowly traffic messaging such as a wake-upping and monitoring listeners on the ASB entities.

Note, that the 360 seconds idle time for listener can be avoided using a short retry time policy at the subscription level, so the subscriber can be called again 3 times within the 5 minutes retry time.

For test purposes, the following is a code snippet of the EventGridTrigger function for subscriber to the ASB events.

#r "..\\bin\\Microsoft.Azure.ServiceBus.dll"
#r "Newtonsoft.Json"

using System;
using System.Threading.Tasks;
using System.Text;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Web;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.Azure.ServiceBus.Primitives;



// sasToken cache
static SasTokenHelper helper = new SasTokenHelper(Environment.GetEnvironmentVariable("AzureServiceBusConnectionString"));

public static async Task Run(JObject eventGridEvent, ILogger log)
{
    log.LogInformation(eventGridEvent.ToString());

    // from the eventgrid payload
    var requestUri = $"{eventGridEvent["data"]?["requestUri"]?.Value<string>()}?api-version=2015-01";

    using (var client = new HttpClient())
    {

        client.DefaultRequestHeaders.Add("Authorization", helper.GetSasToken());

        do
        {
            // read & delete the message 
            var response = await client.DeleteAsync(requestUri);

            // check for message
            if (response.StatusCode != HttpStatusCode.OK)
            {
                log.LogWarning($">>> No message <<<");
                break;
            }

            // message body
            string jsontext = await response.Content.ReadAsStringAsync();

            // show the message
            log.LogInformation($"\nHeaders:\n\t{string.Join("\n\t", response.Headers.Select(i => $"{i.Key}={i.Value.First()}"))}\nBody:\n\t{jsontext}");
        } while (true);

    }

    await Task.CompletedTask;
}




// helpers
class SasTokenHelper
{
    DateTime expiringSaS;
    uint sasTTLInMinutes = 10;
    string sasToken = string.Empty;
    (string hostname, string keyname, string key) config;

    public SasTokenHelper(string connectionString)
    {
        config = GetPartsFromConnectionString(connectionString);
        GetSasToken();
    }

    public string GetSasToken()
    {
        lock (sasToken)
        {
            if (expiringSaS < DateTime.UtcNow.AddMinutes(-1))
            {
                this.sasToken = GetSASToken(config.hostname, config.key, config.keyname, sasTTLInMinutes);
                expiringSaS = DateTime.UtcNow.AddMinutes(sasTTLInMinutes);
            }
            return sasToken;
        }
    }

    internal (string hostname, string keyname, string key) GetPartsFromConnectionString(string connectionString)
    {
        var parts = connectionString.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries).Select(s => s.Split(new[] { '=' }, 2)).ToDictionary(x => x[0].Trim(), x => x[1].Trim());
        return (parts["Endpoint"] ?? "", parts["SharedAccessKeyName"] ?? "", parts["SharedAccessKey"] ?? "");
    }

    internal string GetSASToken(string resourceUri, string key, string keyName = null, uint minutes = 10)
    {
        var tp = SharedAccessSignatureTokenProvider.CreateSharedAccessSignatureTokenProvider(keyName, key, TimeSpan.FromMinutes(minutes));
        return tp.GetTokenAsync(resourceUri, TimeSpan.FromSeconds(60)).Result.TokenValue;
    }
}