0
votes

I am stuck trying to process the results from a number of database queries as they become available. Right now they are all handled at the same time.

There is a list of clients (about 40 right now) each has a database of it's own and they need all need to be queried (Using EF-6). The query method on the repository returns a Task. These tasks are created in a loop.

public Task<List<Employee>> FindByNameOrBirthDateAsync(string surName, DateTime? birthDate)
    {
        var query = ApplicationContext.Employees
            .Include(e => e.Address)
            .Include(e => e.CorporateEntities);

        if (!string.IsNullOrWhiteSpace(surName))
            query = query.Where(e => e.Surname == surName);

        if (birthDate != null)
            query = query.Where(e => e.BirthDate == birthDate);

        return query.ToListAsync();
    }

This code is being called by a service that in turn is being called by a Web-API controller.

public class SearchService
{
    public List<Task<List<Employee>>> SearchOverAllClients(List<Client> clients, string surName, DateTime? birthDate)
    {
        return clients.Select(client => FindEmployees(client.Id, surName, birthDate)).ToList();
    }

    private Task<List<Employee>> FindEmployees(int clientId, string surname, DateTime? birthDate)
    {
        ApplicationUoW unit = new ApplicationUoW(clientId);

        return unit.Employees.FindByNameOrBirthDateAsync(surname, birthDate);

    }
}

In the controller Task.WhenAny() is supposed to return the results to the client as they become available and write them to the output stream.

I have tested with Fiddler breaking on each iteration and the results are sent one at a time. So I don't think the issue is in the stream writing code.

    public HttpResponseMessage Get([FromUri]string surname = "", [FromUri]DateTime? birthDate = null)
    {
        List<Client> clients = _clientsService.GetClients();
        List<Task<List<Employee>>> tasks = _searchService.SearchOverAllClients(clients, surname, birthDate);

        HttpResponseMessage response = Request.CreateResponse();

        response.Content = new PushStreamContent(
            async (outputStream, httpContent, transportContext) =>
            {
                try
                {
                    while (tasks.Count > 0)
                    {
                        var task = await Task.WhenAny(tasks);
                        tasks.Remove(task);

                        List <Employee> employees = await task;
                        string responseContent =
                            await Task.Factory.StartNew(() => JsonConvert.SerializeObject(employees, Formatting.None, new JsonSerializerSettings() { ReferenceLoopHandling = ReferenceLoopHandling.Ignore }));

                        var buffer = Encoding.UTF8.GetBytes(responseContent);

                        await outputStream.WriteAsync(buffer, 0, buffer.Length);
                        await outputStream.FlushAsync();
                    }
                }
                catch (HttpException ex) when (ex.ErrorCode == -2147023667)
                {
                    //The remote host closed the connection.
                    return;
                }
                finally
                {
                    outputStream.Close();
                }
            });

        return response;
    }

The problem with this code is that it waits for all tasks to be finished before the results are handled. While I want to handle them as soon as they are ready.

I have been trying multiple variations of this code with different uses of aysnc/await however all attempts have been unsuccessful so far.

Update I have removed the unneeded parallel code as per the suggestions in the comments.

1
umm, List<T> is not thread safe, you can't call Add from inside a Parallel.Foreach without doing some kind of locking. - Scott Chamberlain
"Parallel" database queries are typically slower from one query that loads the required data in one roundtrip. That's because the main delay is caused by network latency. If not, you should be looking at optimizing your indexes, or check for N+1 problems. BTW parallel loading of individual entities is an N+1 problem - Panagiotis Kanavos
The real throughput killer though is that you increase contention and locking by the number of concurrent connections. You could cause some ugly deadlocks this way - Panagiotis Kanavos
@PanagiotisKanavos The reason we are using parallel queries is because we are querying different databases (with the same schema). That could be on different physical servers. - Arnold Wiersma
@ArnoldWiersma No, you should not use ConcurrentBag. Rather, there's no reason at all to start all of the tasks using Parallel.ForEach. Just use a regular foreach or, as has been mentioned before, Select from LINQ. Creating new threads just to start asynchronous operations is actively counter productive. - Servy

1 Answers

0
votes

I think what you're looking for is ConfigureAwait(false).

ASP.NET implements the SynchronizationContext in a way that guarantees async code will run on a single thread (but not necessarily the same thread) at any given time throughout execution. That means that the code you have posted right now will all be run on a single thread. So for example, if all the query tasks completed immediately, the callback code after var task = await Task.WhenAny(tasks) would not be run in parallel - each callback would be run synchronously, one by one.

By converting to: await Task.WhenAny(tasks).ConfigureAwait(false) you're telling .NET that the callback code can be run on a threadpool thread. But then you'll have to make sure the code after that await is thread-safe and doesn't require the context.

If you're not already familiar with SynchronizationContext and why it matters when using async/await, here's an article I found to be helpful: Await, SynchronizationContext, and Console Apps