214
votes

In a metro app, I need to execute a number of WCF calls. There are a significant number of calls to be made, so I need to do them in a parallel loop. The problem is that the parallel loop exits before the WCF calls are all complete.

How would you refactor this to work as expected?

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new  System.Collections.Concurrent.BlockingCollection<Customer>();

Parallel.ForEach(ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

foreach ( var customer in customers )
{
    Console.WriteLine(customer.ID);
}

Console.ReadKey();
11

11 Answers

192
votes

The whole idea behind Parallel.ForEach() is that you have a set of threads and each thread processes part of the collection. As you noticed, this doesn't work with async-await, where you want to release the thread for the duration of the async call.

You could “fix” that by blocking the ForEach() threads, but that defeats the whole point of async-await.

What you could do is to use TPL Dataflow instead of Parallel.ForEach(), which supports asynchronous Tasks well.

Specifically, your code could be written using a TransformBlock that transforms each id into a Customer using the async lambda. This block can be configured to execute in parallel. You would link that block to an ActionBlock that writes each Customer to the console. After you set up the block network, you can Post() each id to the TransformBlock.

In code:

var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var getCustomerBlock = new TransformBlock<string, Customer>(
    async i =>
    {
        ICustomerRepo repo = new CustomerRepo();
        return await repo.GetCustomer(i);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
    writeCustomerBlock, new DataflowLinkOptions
    {
        PropagateCompletion = true
    });

foreach (var id in ids)
    getCustomerBlock.Post(id);

getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();

Although you probably want to limit the parallelism of the TransformBlock to some small constant. Also, you could limit the capacity of the TransformBlock and add the items to it asynchronously using SendAsync(), for example if the collection is too big.

As an added benefit when compared to your code (if it worked) is that the writing will start as soon as a single item is finished, and not wait until all of the processing is finished.

143
votes

svick's answer is (as usual) excellent.

However, I find Dataflow to be more useful when you actually have large amounts of data to transfer. Or when you need an async-compatible queue.

In your case, a simpler solution is to just use the async-style parallelism:

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var customerTasks = ids.Select(i =>
  {
    ICustomerRepo repo = new CustomerRepo();
    return repo.GetCustomer(i);
  });
var customers = await Task.WhenAll(customerTasks);

foreach (var customer in customers)
{
  Console.WriteLine(customer.ID);
}

Console.ReadKey();
91
votes

Using DataFlow as svick suggested may be overkill, and Stephen's answer does not provide the means to control the concurrency of the operation. However, that can be achieved rather simply:

public static async Task RunWithMaxDegreeOfConcurrency<T>(
     int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
{
    var activeTasks = new List<Task>(maxDegreeOfConcurrency);
    foreach (var task in collection.Select(taskFactory))
    {
        activeTasks.Add(task);
        if (activeTasks.Count == maxDegreeOfConcurrency)
        {
            await Task.WhenAny(activeTasks.ToArray());
            //observe exceptions here
            activeTasks.RemoveAll(t => t.IsCompleted); 
        }
    }
    await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t => 
    {
        //observe exceptions in a manner consistent with the above   
    });
}

The ToArray() calls can be optimized by using an array instead of a list and replacing completed tasks, but I doubt it would make much of a difference in most scenarios. Sample usage per the OP's question:

RunWithMaxDegreeOfConcurrency(10, ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

EDIT Fellow SO user and TPL wiz Eli Arbel pointed me to a related article from Stephen Toub. As usual, his implementation is both elegant and efficient:

public static Task ForEachAsync<T>(
      this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current).ContinueWith(t => 
                          {
                              //observe exceptions
                          });
                      
        })); 
}
51
votes

You can save effort with the new AsyncEnumerator NuGet Package, which didn't exist 4 years ago when the question was originally posted. It allows you to control the degree of parallelism:

using System.Collections.Async;
...

await ids.ParallelForEachAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
},
maxDegreeOfParallelism: 10);

Disclaimer: I'm the author of the AsyncEnumerator library, which is open source and licensed under MIT, and I'm posting this message just to help the community.

16
votes

Wrap the Parallel.Foreach into a Task.Run() and instead of the await keyword use [yourasyncmethod].Result

(you need to do the Task.Run thing to not block the UI thread)

Something like this:

var yourForeachTask = Task.Run(() =>
        {
            Parallel.ForEach(ids, i =>
            {
                ICustomerRepo repo = new CustomerRepo();
                var cust = repo.GetCustomer(i).Result;
                customers.Add(cust);
            });
        });
await yourForeachTask;
7
votes

This should be pretty efficient, and easier than getting the whole TPL Dataflow working:

var customers = await ids.SelectAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    return await repo.GetCustomer(i);
});

...

public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4)
{
    var results = new List<TResult>();

    var activeTasks = new HashSet<Task<TResult>>();
    foreach (var item in source)
    {
        activeTasks.Add(selector(item));
        if (activeTasks.Count >= maxDegreesOfParallelism)
        {
            var completed = await Task.WhenAny(activeTasks);
            activeTasks.Remove(completed);
            results.Add(completed.Result);
        }
    }

    results.AddRange(await Task.WhenAll(activeTasks));
    return results;
}
7
votes

An extension method for this which makes use of SemaphoreSlim and also allows to set maximum degree of parallelism

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxDegreeOfParallelism = null)
    {
        if (maxDegreeOfParallelism.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

Sample Usage:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);
6
votes

I am a little late to party but you may want to consider using GetAwaiter.GetResult() to run your async code in sync context but as paralled as below;

 Parallel.ForEach(ids, i =>
{
    ICustomerRepo repo = new CustomerRepo();
    // Run this in thread which Parallel library occupied.
    var cust = repo.GetCustomer(i).GetAwaiter().GetResult();
    customers.Add(cust);
});
5
votes

After introducing a bunch of helper methods, you will be able run parallel queries with this simple syntax:

const int DegreeOfParallelism = 10;
IEnumerable<double> result = await Enumerable.Range(0, 1000000)
    .Split(DegreeOfParallelism)
    .SelectManyAsync(async i => await CalculateAsync(i).ConfigureAwait(false))
    .ConfigureAwait(false);

What happens here is: we split source collection into 10 chunks (.Split(DegreeOfParallelism)), then run 10 tasks each processing its items one by one (.SelectManyAsync(...)) and merge those back into a single list.

Worth mentioning there is a simpler approach:

double[] result2 = await Enumerable.Range(0, 1000000)
    .Select(async i => await CalculateAsync(i).ConfigureAwait(false))
    .WhenAll()
    .ConfigureAwait(false);

But it needs a precaution: if you have a source collection that is too big, it will schedule a Task for every item right away, which may cause significant performance hits.

Extension methods used in examples above look as follows:

public static class CollectionExtensions
{
    /// <summary>
    /// Splits collection into number of collections of nearly equal size.
    /// </summary>
    public static IEnumerable<List<T>> Split<T>(this IEnumerable<T> src, int slicesCount)
    {
        if (slicesCount <= 0) throw new ArgumentOutOfRangeException(nameof(slicesCount));

        List<T> source = src.ToList();
        var sourceIndex = 0;
        for (var targetIndex = 0; targetIndex < slicesCount; targetIndex++)
        {
            var list = new List<T>();
            int itemsLeft = source.Count - targetIndex;
            while (slicesCount * list.Count < itemsLeft)
            {
                list.Add(source[sourceIndex++]);
            }

            yield return list;
        }
    }

    /// <summary>
    /// Takes collection of collections, projects those in parallel and merges results.
    /// </summary>
    public static async Task<IEnumerable<TResult>> SelectManyAsync<T, TResult>(
        this IEnumerable<IEnumerable<T>> source,
        Func<T, Task<TResult>> func)
    {
        List<TResult>[] slices = await source
            .Select(async slice => await slice.SelectListAsync(func).ConfigureAwait(false))
            .WhenAll()
            .ConfigureAwait(false);
        return slices.SelectMany(s => s);
    }

    /// <summary>Runs selector and awaits results.</summary>
    public static async Task<List<TResult>> SelectListAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector)
    {
        List<TResult> result = new List<TResult>();
        foreach (TSource source1 in source)
        {
            TResult result1 = await selector(source1).ConfigureAwait(false);
            result.Add(result1);
        }
        return result;
    }

    /// <summary>Wraps tasks with Task.WhenAll.</summary>
    public static Task<TResult[]> WhenAll<TResult>(this IEnumerable<Task<TResult>> source)
    {
        return Task.WhenAll<TResult>(source);
    }
}
1
votes

Here is a simple generic implementation of a ForEachAsync method, based on an ActionBlock from the TPL Dataflow library, now embedded in the .NET 5 platform:

public static Task ForEachAsync<T>(this IEnumerable<T> source,
    Func<T, Task> action, int dop)
{
    // Arguments validation omitted
    var block = new ActionBlock<T>(action,
        new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = dop });
    try
    {
        foreach (var item in source) block.Post(item);
        block.Complete();
    }
    catch (Exception ex) { ((IDataflowBlock)block).Fault(ex); }
    return block.Completion;
}

This solution enumerates eagerly the supplied IEnumerable, and sends immediately all its elements to the ActionBlock. So it is not very suitable for enumerables with huge number of elements. Below is a more sophisticated approach, that enumerates the source lazily, and sends its elements to the ActionBlock one by one:

public static async Task ForEachAsync<T>(this IEnumerable<T> source,
    Func<T, Task> action, int dop)
{
    // Arguments validation omitted
    var block = new ActionBlock<T>(action, new ExecutionDataflowBlockOptions()
    { MaxDegreeOfParallelism = dop, BoundedCapacity = dop });
    try
    {
        foreach (var item in source)
            if (!await block.SendAsync(item).ConfigureAwait(false)) break;
        block.Complete();
    }
    catch (Exception ex) { ((IDataflowBlock)block).Fault(ex); }
    try { await block.Completion.ConfigureAwait(false); }
    catch { block.Completion.Wait(); } // Propagate AggregateException
}

These two methods have different behavior in case of exceptions. The first¹ propagates an AggregateException containing the exceptions directly in its InnerExceptions property. The second propagates an AggregateException that contains another AggregateException with the exceptions. Personally I find the behavior of the second method more convenient in practice, because awaiting it eliminates automatically a level of nesting, and so I can simply catch (AggregateException aex) and handle the aex.InnerExceptions inside the catch block. The first method requires to store the Task before awaiting it, so that I can gain access the task.Exception.InnerExceptions inside the catch block. For more info about propagating exceptions from async methods, look here or here.

Both implementations handle gracefully any errors that may occur during the enumeration of the source. The ForEachAsync method does not complete before all pending operations are completed. No tasks are left behind unobserved (in fire-and-forget fashion).

¹ The first implementation elides async and await.

-1
votes

Easy native way without TPL:

int totalThreads = 0; int maxThreads = 3;

foreach (var item in YouList)
{
    while (totalThreads >= maxThreads) await Task.Delay(500);
    Interlocked.Increment(ref totalThreads);

    MyAsyncTask(item).ContinueWith((res) => Interlocked.Decrement(ref totalThreads));
}

you can check this solution with next task:

async static Task MyAsyncTask(string item)
{
    await Task.Delay(2500);
    Console.WriteLine(item);
}