0
votes

My functions will return information really fast. that's great news! However because the parallel loop is running async the function returns a value before the loop finishes, unless I do some long running task on the main thread to wait for a result. There is no UI to hold up, so I'm using async/await to try a push for quick results in the TPL.

I have introduced a boolean flag value and while loop to wait for a result.

this works but seems strange.

Is there a better way to 'await' the result in my particular situation. It is in the first code snippet when I use the 'while loop' that things seem weird.

Note: I should mention that because it is an Alexa response, outside of this function is a task which 'Task.Delays' for eight seconds and then returns a response in case my other task takes to long and Alexa is going to time out.

private static string SavedImageAnalysisResult(IReadOnlyCollection<Image> savedImageList, ConfigurationDto config)
    {
        string result = "[]";
        BreakImageAnalysis = false;

        if (!savedImageList.Any()) return result;

        Parallel.ForEach(savedImageList, new ParallelOptions
        {
            MaxDegreeOfParallelism = 5000
        },
            async (image, loopState) =>
            {
                Task.Run(() => Console.Write("██"));

                string threadLocalAnalysisResult =
                    await AnalyzeImageAsync(image.ImageBytes, config);

                if (IsEmptyOrErrorAnalysis(threadLocalAnalysisResult)) return;
                Task.Run(() => Console.Write("█ █"));
                result = threadLocalAnalysisResult;
                BreakImageAnalysis = true;
                loopState.Break();
            });

        while (!BreakImageAnalysis) if (BreakImageAnalysis) break; //strange to do this?

        return result;
    }

This function is called like this:

public static List<Person> DetectPersonAsync()
    {
        Task.Run(() => Console.WriteLine("{0}\nNew Person Detection Requested...", DateTime.Now.ToString("f")));

        ConfigurationDto config = Configuration.GetSettings();

        camera = new SecurityCamera();

        byte[] imageData = camera.GetImageAsByte(config.SecurityCameraUrl +
                                                 config.SecurityCameraStaticImage +
                                                 DateTime.Now);

        if (!imageData.Any()) return null;

        string imageAnalysis = "[]";

        SavedImageList = camera.ImageCache;

        Task.Run(() => Console.WriteLine("\nBegin Image Analysis...\n"));

        var imageAnalysisTasks = new[]
        {
            Task.Factory.StartNew(() => SavedImageAnalysisResult(SavedImageList, config)),
            Task.Factory.StartNew(() => InProgressImageAnalysisResult(camera, config))
        };

        Task.WaitAll(imageAnalysisTasks);

        Task.Run(() => Console.WriteLine("\n\nAnalysis complete\n"));

        if (!IsEmptyOrErrorAnalysis(imageAnalysisTasks[0].Result))
            imageAnalysis = imageAnalysisTasks[0].Result;

        if (!IsEmptyOrErrorAnalysis(imageAnalysisTasks[1].Result))
            imageAnalysis = imageAnalysisTasks[1].Result;

        return !IsEmptyOrErrorAnalysis(imageAnalysis)
            ? JsonConvert.DeserializeObject<List<Person>>(imageAnalysis)
            : new List<Person>();
    }

But this function is called like this:

  if (alexa.IsLaunchRequest(alexaRequest))
   {
    //We don't want to wait for these two tasks to return
    Task.Run(() => SendSecurityImageToMagicMirrorUi());
    Task.Run(() => alexa.PostDirectiveResponseAsync(alexaRequest));

     //On your marks get set go! 8 seconds and counting
            return await Task.WhenAny(new[]
           {

               Task.Run(() => GetAlexaCognitiveResponseAsync()),
               Task.Run(() => AlexaRequestTimeoutMonitor())

      }).Result;
     }

And finally there is the Timeout function which will return if 8 seconds is up:

 private static async Task<object> AlexaRequestTimeoutMonitor()
    {
        await Task.Delay(new TimeSpan(0, 0, 0, 8));

        return AlexaApi.ResponseBuilder(CreateNoPersonDetectionPhrase(new AlexaSynthesisResponseLibrary()), false);
    }

It is in the 'CreateNoPersonDetectedPhrase' function inwhich I turn the 'IsFound' boolean flag back to 'false'

1
Parallel.ForEach is a blocking instruction. Maybe you messed with break conditions? Also, why do you assign results to single variable? - Krzysztof Skowronek
Just to confirm your question. Result to single variable in the third code snippet? "IsLaunchRequest" - user2224583
no, in the first one. You have result = threadLocalAnalysisResult; in loop body. I know that later on you have loop.Break(), but it does not guarantee single assigment, especially if you have more than one valid picture - Krzysztof Skowronek
Right! The loop will sometimes return many results. I thought that having the break would stop the loop eventually, and having passed a result off to the 'result' variable at the time of a valid image, would speed up the process of returning it. Because the images that are being analyized are of the same person but where milliseconds have passed. It didn't matter which result was choosen but only that one was. - user2224583
the break just disables starting next loop call - Krzysztof Skowronek

1 Answers

1
votes

Your Parallel.ForEach starts asynchronous delegates and in reality they are being executed in parallel up until await AnalyzeImageAsync point which returns Task as a promise. Now having this promise the Parallel.ForEach "thinks" this task is completed while in reality an asynchronous operation most likely is just getting started. But no one is awaiting for that so Parallel.ForEach completes very quickly. So you need to expose these promises outside of the Parallel.ForEach loop so that they can be awaited with Task.WhenAll for example.

But I'd also suggest to consider whether you need this parallelization at all since if the most of AnalyzeImageAsync operation (which isn't presented here) isn't CPU-bound it's more reasonable to use only asynchronicity.

The simplest way (I'm not saying the best) to wait for the async tasks in the Parallels.Forxxx

    public static async Task SomeAsyncTask()
    {
        await Task.Delay(5000);
        Console.WriteLine("2222");
    }

    public static async Task Loop()
    {
        var collection = new[] { 1, 2, 3, 4 };
        var asyncTasks = new Task[4];

        Parallel.ForEach(collection,
            (item, loop, index) =>
            {
                Console.WriteLine("1111");
                asyncTasks[index] = SomeAsyncTask();
            });

        await Task.WhenAll(asyncTasks);
    }

Note that everything after SomeAsyncTask() should be moved into the continuation of the task (in this case it's Console.WriteLine("2222")).