2
votes

I have the following scenario:

I must execute a function that retrieves N (N between 0 and infinite) records. I must call a mapping function to transform the records into something else and move them forward (via http, service bus, cosmos db, etc)

I cannot use a regular Azure Function because of the 10 minutes limit, so I'm looking if Durable Functions can solve my problem.

My idea is the following:
1 - When the durable function triggers, it streams the records from a database.
2 - For each record, it calls the mapping function.
3 - After mapping, it sends the record to a message via service bus.

As a proof of concept I did the example below. I simulate receiving 1000 messages in the durable function, but it is behaving in a very unreliable manner. If I send 1000 messages, the function kinda crashes or takes too long to finish, I was expecting this code to finish almost instantly.

#r "Microsoft.Azure.WebJobs.Extensions.DurableTask"

public static async Task<List<string>> Run(DurableOrchestrationContext context, TraceWriter log)
{
    var outputs = new List<string>();

    var tasks = new List<Task<string>>();
    for(int i = 0; i < 1000; i++)
    {
        log.Info(i.ToString());
        tasks.Add(context.CallActivityAsync<string>("Hello", i.ToString()));
    }

    outputs.AddRange(await Task.WhenAll(tasks.ToArray()));

    return outputs;
}

My question is: are Durable Functions suited for this scenario? Should I look into some non-serveless function approach to extract the data from the database?

Is there a way of calling another Azure function synchronously from within a Durable Function?

2

2 Answers

4
votes

Before you start, you have to consider how Durable Functions really work. To understand the flow, please take a look at the following example:

#r "Microsoft.Azure.WebJobs.Extensions.DurableTask"

public static async Task Run(DurableOrchestrationContext context, TraceWriter log)
{
    await context.CallActivityAsync<string>("Hello1");
    await context.CallActivityAsync<string>("Hello2");
}

The way how the runtime works is as follows:

  1. It enters the orchestration and hits the first await, where an activity Hello1 is called
  2. The control is returned to a component called Dispatcher, which is an internal part of a framework. It checks whether for the current orchestration ID this particular activity has been called. If not, it awaits for the result and deallocates resources used by an orchestration
  3. Once awaited Task is finished, Dispatcher recreates orchestration and replays it from the beginning
  4. It once more awaits activity Hello1, but this time after consulting orchestration history it knows, that it has been called and result was saved - it uses saved result and continues execution
  5. It hits the second await and the whole cycle goes once again

As you can see there's serious work to be performed under the hood. There's also a rule of a thumb when it comes to delegating work to orchestrations and activities:

  • orchestration should only orchestrate - since it has many limitations like being single-threaded, awaiting only safe tasks (that means those available on DurableOrchestrationContext type) and is scaled amongst several queues(and not VMs). What is more it has to be idempotent(so it cannot use e.g. DateTime.Now or directly query database)
  • activity should perform the work - it works as a typical function(without limits of orchestrations) and is scaled to multiple different VMs

In your scenario you should rather execute only a single activity, which will do all the work instead of looping through the records in an orchestration (especially as you cannot use a binding to e.g. Service Bus in orchestration - however you are able to do so in activity, which can fetch data, transform it and then push to any kind of service you want). So in your code you could have something like this:

[FunctionName("Orchestration_Client")]
public static async Task<string> Orchestration_Client(
    [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "start")] HttpRequestMessage input,
    [OrchestrationClient] DurableOrchestrationClient starter)
{
    return await starter.StartNewAsync("Orchestration", await input.Content.ReadAsStringAsync());
}

[FunctionName("Orchestration")]
public static async Task Orchestration_Start([OrchestrationTrigger] DurableOrchestrationContext context)
{
    var payload = context.GetInput<string>();
    await context.CallActivityAsync(nameof(Activity), payload);
}

[FunctionName("Activity")]
public static string Activity(
    [ActivityTrigger] DurableActivityContext context,
    [Table(TableName, Connection = "TableStorageConnectionName")] IAsyncCollector<FooEntity> foo)
{
    // Get data from request
    var payload = context.GetInput<string>();

    // Fetch data from database
    using(var conn = new SqlConnection())
    ...

    // Transform it
    foreach(var record in databaseResult) 
    {
        // Do some work and push data
        await foo.AddAsync(new FooEntity() { // Properties });
    }

    // Result
    return $"Processed {count} records!!";
}

It is more an idea than a real example, but you should be able to get the point. Other thing is whether Durable Functions are really the best solution for such operation - I believe there are better services for that like Azure Data Factory for example.