2
votes

I am trying to consume a service reference, making multiple requests at the same time using a task scheduler. The service includes an synchronous and an asynchronous function that returns a result set. I am a bit confused, and I have a couple of initial questions, and then I will share how far I got in each. I am using some logging, concurrency visualizer, and fiddler to investigate. Ultimately I want to use a reactive scheduler to make as many requests as possible.

1) Should I use the async function to make all the requests?

2) If I were to use the synchronous function in multiple tasks what would be the limited resources that would potentially starve my thread count?

Here is what I have so far:

var myScheduler = new myScheduler();
var myFactory = new Factory(myScheduler); 
var myClientProxy = new ClientProxy(); 
var tasks = new List<Task<Response>>();
foreach( var request in Requests )
{
    var localrequest = request;
    tasks.Add( myFactory.StartNew( () =>
        {
            // log stuff
            return  client.GetResponsesAsync( localTransaction.Request );
            // log some more stuff
        }).Unwrap() );
}

Task.WaitAll( tasks.ToArray() );
// process all the requests after they are done

This runs but according to fiddler it just tries to do all of the requests at once. It could be the scheduler but I trust that more then I do the above.

I have also tried to implement it without the unwrap command and instead using an async await delegate and it does the same thing. I have also tried referencing the .result and that seems to do it sequentially. Using the non synchronous service function call with the scheduler/factory it only gets up to about 20 simultaneous requests at the same time per client.

3
What would you want it to do instead of firing all the requests at once? - i3arnon
@i3arnon - thanks for the response! I have tens of thousands of requests to make. The service can only handle so many based on the current load. I am not the only client. I want to use a custom scheduler to adjust the number of concurrent requests based on the service response time. - Ashtonian
That's better accomplished by using a semaphore. See my answer - i3arnon

3 Answers

6
votes
  1. Yes. It will allow your application to scale better by using fewer threads to accomplish more.
  2. Threads. When you initiate a synchronous operation that is inherently asynchronous (e.g. I/O) you have a blocked thread waiting for the operation to complete. You could however be using this thread in the meantime to execute CPU bound operations.

The simplest way to limit the amount of concurrent requests is to use a SemaphoreSlim which allows to asynchronously wait to enter it:

async Task ConsumeService()
{
    var client = new ClientProxy();
    var semaphore = new SemaphoreSlim(100);
    var tasks = Requests.Select(async request =>
    {
        await semaphore.WaitAsync();
        try
        {
            return await client.GetResponsesAsync(request);
        }
        finally
        {
            semaphore.Release();
        }
    }).ToList();
    await Task.WhenAll(tasks);

    // TODO: Process responses...
}
3
votes

Regardless of how you are calling the WCF service whether it is an Async call or a Synchronous one you will be bound by the WCF serviceThrottling limits. You should look at these settings and possible adjust them higher (if you have them set to low values for some reason), in .NET4 the defaults are pretty good, however In older versions of the .NET framework, these defaults were much more conservative than .NET4.

.NET 4.0

  • MaxConcurrentSessions: default is 100 * ProcessorCount
  • MaxConcurrentCalls: default is 16 * ProcessorCount
  • MaxConcurrentInstances: default is MaxConcurrentCalls+MaxConcurrentSessions
3
votes

1.)Yes. 2.)Yes.

If you want to control the number of simultaneous requests you can try using Stephen Toub's ForEachAsync method. it allows you to control how many tasks are processed at the same time.

public static class Extensions
{
    public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
    { 
        return Task.WhenAll( 
            from partition in Partitioner.Create(source).GetPartitions(dop) 
            select Task.Run(async delegate { 
                using (partition) 
                    while (partition.MoveNext()) 
                        await body(partition.Current); 
            })); 
    }
}

void Main()
{
    var myClientProxy = new ClientProxy(); 
    var responses = new List<Response>();

    // Max 10 concurrent requests
    Requests.ForEachAsync<Request>(10, async (r) =>
    {
        var response = await client.GetResponsesAsync( localTransaction.Request );
        responses.Add(response);
    }).Wait();      
}