I have an Durable Function Fan Out and In pattern that doesn't seem to be working reliably. The Orchestration is called from a Timer function every 10 minutes, but have since increased this to 20. The Activity function is called using context.CallActivityAsync and returns an integer (number of rows processed). Currently, the workItems passed in should only contain 2 items to process. The first item processes all rows and shows a complete in the log. The second item sometimes shows rows being processed but at some point it just stops... No other activity is recognized and the "completion" never shows up in the logs. Also the second activity sometimes shows it being run multiple times concurrently... I have tried this exact code on my dev machine using the same data and it processes to completion taking no more than 5 minutes. I have also set the hosts.json file to
{
"version": "2.0",
"functionTimeout": "00:10:00",
"extensions": {
"queues": {
"maxPollingInterval": "00:00:05"
}
}
}
Orchestration:
public static async void RunOrchestrator(
[OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
{
log.LogInformation($"************** Fanning out ********************");
var parallelTasks = new List<Task<int>>();
//object[] workBatch = await context.CallActivityAsync<object[]>("GETVendors", null);
object[] workBatch = GETVendors(log); //work batch only has 2 items
for (int i = 0; i<workBatch.Length; i++)
{
Task<int> task = context.CallActivityAsync<int>("SynchVendor", workBatch[i]);
parallelTasks.Add(task);
}
log.LogInformation($"************** 'Waiting' for parallel results ********************");
await Task.WhenAll(parallelTasks);
log.LogInformation($"************** All activity functions complete ********************");
log.LogInformation($"************** fanning in ********************");
int cnt = 0;
foreach (var completedParallelActivity in parallelTasks)
{
cnt += completedParallelActivity.Result;
}
log.LogInformation($"Total Records Converted across all tasks = {cnt}");
//return outputs;
}
Activity function
public static async Task<int> SynchVendor([ActivityTrigger] string vendor, ILogger log)
{
log.LogInformation($"SynchVendor {vendor}");
string sqlStr = Environment.GetEnvironmentVariable("Sqldb_Connection");
bool dev = Convert.ToBoolean(Environment.GetEnvironmentVariable("Dev"));
int totalCount = 0;
using (SqlConnection conn = new SqlConnection(sqlStr))
{
conn.Open();
// lets synch the vendor
Int32 limit = 200;
bool initialLoad = false;
int offset = 0;
bool done = false;
do
{
//synch logic...
// if there are rows found to have changed then send them to a queue for further processing
} while (!done);
// we are done syncing a vendor write out the vendorinfo
conn.Close();
}
log.LogInformation($"SynchVendor {vendor} Complete");
return totalCount;