If briefly, our task is to process a lot of input messages.
To solve this we decided to use Azure Queue Storage and Azure Functions. We have Azure Functions structure similar to the following code:
Queue triggered function
[FunctionName("MessageControllerExecutor")]
public static async void Run(
[QueueTrigger(QUEUE_NAME, Connection = QUEUE_CONNECTION_NAME)]string queueMessage,
[OrchestrationClient] DurableOrchestrationClient client,
TraceWriter log)
{
await client.StartNewAsync("MessageController", queueMessage);
}
Durable function
[FunctionName("MessageController")]
public static async void Run(
[OrchestrationTrigger] DurableOrchestrationContext context,
TraceWriter log)
{
if (!context.IsReplaying) log.Warning("MessageController started");
var function1ResultTask = context.CallActivityAsync<ResultMessage>("Function_1", new InputMessage());
var function2ResultTask = context.CallActivityAsync<ResultMessage>("Function_2", new InputMessage());
await Task.WhenAll(function1ResultTask, function2ResultTask);
// process Function_1 and Function_2 results
// ...
}
Simple activity function sample
[FunctionName("Function_1")]
public static ResultMessage Run(
[ActivityTrigger] DurableActivityContext activityContext,
TraceWriter log)
{
var msg = activityContext.GetInput<InputMessage>();
int time = new Random().Next(1, 3);
Thread.Sleep(time * 1000);
return new ResultMessage()
{
Payload = $"Function_1 slept for {time} sec"
};
}
MessageControllerExecutor
triggered when a new item is received in a queue.
MessageController
is a Durable Function that uses a few simple activity functions to process each message.
When we push messages to the queue, the MessageControllerExecutor
function starts immediately and asynchronously fires the MessageController
and passes the message, so this works as expected.
But we are faced with the problem. Not all MessageController
function instances run.
For example, we pushed 100 messages into the queue, but only about 10-20% of the messages were processed by MessageController
.
Some messages were not processed or were processed with a long delay. It looks like durable functions failed to start б, though no exceptions were thrown.
We have a few questions:
- Is this solution with queue triggered and durable functions correct to process the message queue or there is a better way to trigger durable functions by the queue?
- Are there any limitations to run durable functions?
- How many durable functions can be executed at the same time?