In addition to Sean's answer, the Azure Service Bus integration with an AEG enables to build some watchdog capability for ASB entities. Note, that this integration is not something like is done for the storage blob account, where the events are published each time when the blob is created/deleted.
In other words, the ASB will not publish an event for each message arrived in the ASB entity, the events are published like an entity watchdog.
This kind of entity watchdog uses the following logic:
- No event is published when there is no message in the entity.
- The event is published immediately when the first message is arrived into the entity and there is no active listener for 360+ seconds on the entity
- The event is published every 120 seconds when the listener is still non-active and there are at least one message in the entity
- The event is published after 360 seconds listener idle (non-active) time and still there are at least one message in the entity. For example, if we have 5 messages in the entity and the subscriber will pull-up only one message using a REST Api, the next event will be published after 360 seconds. In other words, the watchdog entity allows to keep a listener in the idle time for 360 seconds.
Based on the above "watchdog entity" behavior, this feature looks like is more suitable for slowly traffic messaging such as a wake-upping and monitoring listeners on the ASB entities.
Note, that the 360 seconds idle time for listener can be avoided using a short retry time policy at the subscription level, so the subscriber can be called again 3 times within the 5 minutes retry time.
For test purposes, the following is a code snippet of the EventGridTrigger function for subscriber to the ASB events.
#r "..\\bin\\Microsoft.Azure.ServiceBus.dll"
#r "Newtonsoft.Json"
using System;
using System.Threading.Tasks;
using System.Text;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Web;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.Azure.ServiceBus.Primitives;
// sasToken cache
static SasTokenHelper helper = new SasTokenHelper(Environment.GetEnvironmentVariable("AzureServiceBusConnectionString"));
public static async Task Run(JObject eventGridEvent, ILogger log)
{
log.LogInformation(eventGridEvent.ToString());
// from the eventgrid payload
var requestUri = $"{eventGridEvent["data"]?["requestUri"]?.Value<string>()}?api-version=2015-01";
using (var client = new HttpClient())
{
client.DefaultRequestHeaders.Add("Authorization", helper.GetSasToken());
do
{
// read & delete the message
var response = await client.DeleteAsync(requestUri);
// check for message
if (response.StatusCode != HttpStatusCode.OK)
{
log.LogWarning($">>> No message <<<");
break;
}
// message body
string jsontext = await response.Content.ReadAsStringAsync();
// show the message
log.LogInformation($"\nHeaders:\n\t{string.Join("\n\t", response.Headers.Select(i => $"{i.Key}={i.Value.First()}"))}\nBody:\n\t{jsontext}");
} while (true);
}
await Task.CompletedTask;
}
// helpers
class SasTokenHelper
{
DateTime expiringSaS;
uint sasTTLInMinutes = 10;
string sasToken = string.Empty;
(string hostname, string keyname, string key) config;
public SasTokenHelper(string connectionString)
{
config = GetPartsFromConnectionString(connectionString);
GetSasToken();
}
public string GetSasToken()
{
lock (sasToken)
{
if (expiringSaS < DateTime.UtcNow.AddMinutes(-1))
{
this.sasToken = GetSASToken(config.hostname, config.key, config.keyname, sasTTLInMinutes);
expiringSaS = DateTime.UtcNow.AddMinutes(sasTTLInMinutes);
}
return sasToken;
}
}
internal (string hostname, string keyname, string key) GetPartsFromConnectionString(string connectionString)
{
var parts = connectionString.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries).Select(s => s.Split(new[] { '=' }, 2)).ToDictionary(x => x[0].Trim(), x => x[1].Trim());
return (parts["Endpoint"] ?? "", parts["SharedAccessKeyName"] ?? "", parts["SharedAccessKey"] ?? "");
}
internal string GetSASToken(string resourceUri, string key, string keyName = null, uint minutes = 10)
{
var tp = SharedAccessSignatureTokenProvider.CreateSharedAccessSignatureTokenProvider(keyName, key, TimeSpan.FromMinutes(minutes));
return tp.GetTokenAsync(resourceUri, TimeSpan.FromSeconds(60)).Result.TokenValue;
}
}