4
votes

I have an Durable Function Fan Out and In pattern that doesn't seem to be working reliably. The Orchestration is called from a Timer function every 10 minutes, but have since increased this to 20. The Activity function is called using context.CallActivityAsync and returns an integer (number of rows processed). Currently, the workItems passed in should only contain 2 items to process. The first item processes all rows and shows a complete in the log. The second item sometimes shows rows being processed but at some point it just stops... No other activity is recognized and the "completion" never shows up in the logs. Also the second activity sometimes shows it being run multiple times concurrently... I have tried this exact code on my dev machine using the same data and it processes to completion taking no more than 5 minutes. I have also set the hosts.json file to

{
  "version": "2.0",
  "functionTimeout": "00:10:00",
  "extensions": {
    "queues": {
      "maxPollingInterval": "00:00:05"
    }
  }
}

Orchestration:

public static async void RunOrchestrator(
            [OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
        {
            log.LogInformation($"************** Fanning out ********************");
            var parallelTasks = new List<Task<int>>();
            //object[] workBatch = await context.CallActivityAsync<object[]>("GETVendors", null);
            object[] workBatch = GETVendors(log); //work batch only has 2 items 
            for (int i = 0; i<workBatch.Length; i++)
            {
                Task<int> task = context.CallActivityAsync<int>("SynchVendor", workBatch[i]);
                parallelTasks.Add(task);
            }

            log.LogInformation($"************** 'Waiting' for parallel results ********************");
            await Task.WhenAll(parallelTasks);
            log.LogInformation($"************** All activity functions complete ********************");

            log.LogInformation($"************** fanning in ********************");
            int cnt = 0;
            foreach (var completedParallelActivity in parallelTasks)
            {
                cnt += completedParallelActivity.Result;
            }
            log.LogInformation($"Total Records Converted across all tasks = {cnt}");
            //return outputs;
        }

Activity function

public static async Task<int> SynchVendor([ActivityTrigger] string vendor, ILogger log)
        {
            log.LogInformation($"SynchVendor {vendor}");


            string sqlStr = Environment.GetEnvironmentVariable("Sqldb_Connection");
            bool dev = Convert.ToBoolean(Environment.GetEnvironmentVariable("Dev"));
            int totalCount = 0;

            using (SqlConnection conn = new SqlConnection(sqlStr))
            {
                conn.Open();


                // lets synch the vendor


                Int32 limit = 200;
                bool initialLoad = false;
                int offset = 0;
                bool done = false;

                do
                {

                    //synch logic...
                    // if there are rows found to have changed then send them to a queue for further processing

                } while (!done);

                // we are done syncing a vendor write out the vendorinfo


                conn.Close();
            }
            log.LogInformation($"SynchVendor {vendor} Complete");
            return totalCount;
1

1 Answers

3
votes

For the extra logging, you need to add this in front of your log.Log*** operations:

if (!context.IsReplaying) 

See https://docs.microsoft.com/en-us/sandbox/functions-recipes/durable-diagnostics#logging-in-orchestrator-functions for more information.

For the issue where you never see completion, you have a few things you can do:

  1. You have no error handling. What happens if your activity function throws an exception? You should have Try/Catch with logs to let you know when something has failed.
  2. What do the function logs show? Note that I am not talking about the logs where customDimensions.Category=="User" (ie. your log entries), but the ones the Functions runtime executes. In your appinsights logs, run "union traces | union exceptions" with an appropriate timeframe to see what the Functions runtime is doing.
  3. Try adding a timeout task so that your orchestrator will finish even if one of your tasks doesn't complete. See https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-error-handling?tabs=csharp#function-timeouts.