I am stuck trying to process the results from a number of database queries as they become available. Right now they are all handled at the same time.
There is a list of clients (about 40 right now) each has a database of it's own and they need all need to be queried (Using EF-6). The query method on the repository returns a Task. These tasks are created in a loop.
public Task<List<Employee>> FindByNameOrBirthDateAsync(string surName, DateTime? birthDate)
{
var query = ApplicationContext.Employees
.Include(e => e.Address)
.Include(e => e.CorporateEntities);
if (!string.IsNullOrWhiteSpace(surName))
query = query.Where(e => e.Surname == surName);
if (birthDate != null)
query = query.Where(e => e.BirthDate == birthDate);
return query.ToListAsync();
}
This code is being called by a service that in turn is being called by a Web-API controller.
public class SearchService
{
public List<Task<List<Employee>>> SearchOverAllClients(List<Client> clients, string surName, DateTime? birthDate)
{
return clients.Select(client => FindEmployees(client.Id, surName, birthDate)).ToList();
}
private Task<List<Employee>> FindEmployees(int clientId, string surname, DateTime? birthDate)
{
ApplicationUoW unit = new ApplicationUoW(clientId);
return unit.Employees.FindByNameOrBirthDateAsync(surname, birthDate);
}
}
In the controller Task.WhenAny() is supposed to return the results to the client as they become available and write them to the output stream.
I have tested with Fiddler breaking on each iteration and the results are sent one at a time. So I don't think the issue is in the stream writing code.
public HttpResponseMessage Get([FromUri]string surname = "", [FromUri]DateTime? birthDate = null)
{
List<Client> clients = _clientsService.GetClients();
List<Task<List<Employee>>> tasks = _searchService.SearchOverAllClients(clients, surname, birthDate);
HttpResponseMessage response = Request.CreateResponse();
response.Content = new PushStreamContent(
async (outputStream, httpContent, transportContext) =>
{
try
{
while (tasks.Count > 0)
{
var task = await Task.WhenAny(tasks);
tasks.Remove(task);
List <Employee> employees = await task;
string responseContent =
await Task.Factory.StartNew(() => JsonConvert.SerializeObject(employees, Formatting.None, new JsonSerializerSettings() { ReferenceLoopHandling = ReferenceLoopHandling.Ignore }));
var buffer = Encoding.UTF8.GetBytes(responseContent);
await outputStream.WriteAsync(buffer, 0, buffer.Length);
await outputStream.FlushAsync();
}
}
catch (HttpException ex) when (ex.ErrorCode == -2147023667)
{
//The remote host closed the connection.
return;
}
finally
{
outputStream.Close();
}
});
return response;
}
The problem with this code is that it waits for all tasks to be finished before the results are handled. While I want to handle them as soon as they are ready.
I have been trying multiple variations of this code with different uses of aysnc/await however all attempts have been unsuccessful so far.
Update I have removed the unneeded parallel code as per the suggestions in the comments.
List<T>is not thread safe, you can't callAddfrom inside a Parallel.Foreach without doing some kind of locking. - Scott ChamberlainConcurrentBag. Rather, there's no reason at all to start all of the tasks usingParallel.ForEach. Just use a regularforeachor, as has been mentioned before,Selectfrom LINQ. Creating new threads just to start asynchronous operations is actively counter productive. - Servy