62
votes

I generated a proxy with task-based operations.

How should this service be invoked properly (disposing of the ServiceClient and the OperationContext afterwards) using async/await?

My first attempt was:

public async Task<HomeInfo> GetHomeInfoAsync(DateTime timestamp)
{
    using (var helper = new ServiceHelper<ServiceClient, ServiceContract>())
    {
        return await helper.Proxy.GetHomeInfoAsync(timestamp);
    }
}

Being ServiceHelper a class which creates the ServiceClient and the OperationContextScope and disposes of them afterwards:

try
{
    if (_operationContextScope != null)
    {
        _operationContextScope.Dispose();
    }

    if (_serviceClient != null)
    {
        if (_serviceClient.State != CommunicationState.Faulted)
        {
            _serviceClient.Close();
        }
        else
        {
            _serviceClient.Abort();
        }
    }
}
catch (CommunicationException)
{
    _serviceClient.Abort();
}
catch (TimeoutException)
{
    _serviceClient.Abort();
}
catch (Exception)
{
    _serviceClient.Abort();
    throw;
}
finally
{
    _operationContextScope = null;
    _serviceClient = null;
}

However, this failed miserably when calling two services at the same time with the following error: "This OperationContextScope is being disposed on a different thread than it was created."

MSDN says:

Do not use the asynchronous “await” pattern within a OperationContextScope block. When the continuation occurs, it may run on a different thread and OperationContextScope is thread specific. If you need to call “await” for an async call, use it outside of the OperationContextScope block.

So that's the problem! But, how do we fix it properly?

This guy did just what MSDN says:

private async void DoStuffWithDoc(string docId)
{
   var doc = await GetDocumentAsync(docId);
   if (doc.YadaYada)
   {
        // more code here
   }
}

public Task<Document> GetDocumentAsync(string docId)
{
  var docClient = CreateDocumentServiceClient();
  using (new OperationContextScope(docClient.InnerChannel))
  {
    return docClient.GetDocumentAsync(docId);
  }
}

My problem with his code, is that he never calls Close (or Abort) on the ServiceClient.

I also found a way of propagating the OperationContextScope using a custom SynchronizationContext. But, besides the fact that it's a lot of "risky" code, he states that:

It’s worth noting that it does have a few small issues regarding the disposal of operation-context scopes (since they only allow you to dispose them on the calling thread), but this doesn’t seem to be an issue since (at least according to the disassembly), they implement Dispose() but not Finalize().

So, are we out of luck here? Is there a proven pattern for calling WCF services using async/await AND disposing of BOTH the ServiceClient and the OperationContextScope? Maybe someone form Microsoft (perhaps guru Stephen Toub :)) can help.

Thanks!

[UPDATE]

With a lot of help from user Noseratio, I came up with something that works: do not use OperationContextScope. If you are using it for any of these reasons, try to find a workaround that fits your scenario. Otherwise, if you really, really, need OperationContextScope, you'll have to come up with an implementation of a SynchronizationContext that captures it, and that seems very hard (if at all possible - there must be a reason why this isn't the default behavior).

So, the full working code is:

public async Task<HomeInfo> GetHomeInfoAsync(DateTime timestamp)
{
    using (var helper = new ServiceHelper<ServiceClient, ServiceContract>())
    {
        return await helper.Proxy.GetHomeInfoAsync(timestamp);
    }
}

With ServiceHelper being:

public class ServiceHelper<TServiceClient, TService> : IDisposable
    where TServiceClient : ClientBase<TService>, new()
    where TService : class
{
protected bool _isInitialized;
    protected TServiceClient _serviceClient;

    public TServiceClient Proxy
    {
        get
        {
            if (!_isInitialized)
            {
                Initialize();
                _isInitialized = true;
            }
            else if (_serviceClient == null)
            {
                throw new ObjectDisposedException("ServiceHelper");
            }

            return _serviceClient;
        }
    }

    protected virtual void Initialize()
    {
        _serviceClient = new TServiceClient();
    }

    // Implement IDisposable.
    // Do not make this method virtual.
    // A derived class should not be able to override this method.
    public void Dispose()
    {
        Dispose(true);

        // Take yourself off the Finalization queue 
        // to prevent finalization code for this object
        // from executing a second time.
        GC.SuppressFinalize(this);
    }

    // Dispose(bool disposing) executes in two distinct scenarios.
    // If disposing equals true, the method has been called directly
    // or indirectly by a user's code. Managed and unmanaged resources
    // can be disposed.
    // If disposing equals false, the method has been called by the 
    // runtime from inside the finalizer and you should not reference 
    // other objects. Only unmanaged resources can be disposed.
    protected virtual void Dispose(bool disposing)
    {
        // If disposing equals true, dispose all managed 
        // and unmanaged resources.
        if (disposing)
        {
            try
            {
                if (_serviceClient != null)
                {
                    if (_serviceClient.State != CommunicationState.Faulted)
                    {
                        _serviceClient.Close();
                    }
                    else
                    {
                        _serviceClient.Abort();
                    }
                }
            }
            catch (CommunicationException)
            {
                _serviceClient.Abort();
            }
            catch (TimeoutException)
            {
                _serviceClient.Abort();
            }
            catch (Exception)
            {
                _serviceClient.Abort();
                throw;
            }
            finally
            {
                _serviceClient = null;
            }
        }
    }
}

Note that the class supports extension; perhaps you need to inherit and provide credentials.

The only possible "gotcha" is that in GetHomeInfoAsync, you can't just return the Task you get from the proxy (which should seem natural, why create a new Task when you already have one). Well, in this case you need to await the proxy Task and then close (or abort) the ServiceClient, otherwise you'll be closing it right away after invoking the service (while bytes are being sent over the wire)!

OK, we have a way to make it work, but it'd be nice to get an answer from an authoritative source, as Noseratio states.

8
The environment making calls to the WCF proxy is ASP.NET Web API.noseratio
I think you need to re-evaluate the need to use async/await and dispose the OperationContextScope. For what I could understand OperationContextScope is a process variable (a static variable, OperationContext.Current implies this) so it's not suitable for parallel service calls with different OperationContextScopes. Because of this async/await is not really compatible with operation context dispose and therefore you are only asking about a hack, which I don't recommend.user823959
Some info regarding the stack behaviour of the OperantionContextScope stackoverflow.com/questions/9492085/…user823959
@Noseratio Rewriting, at IL level, the state machine the compiler generates when using async/await, looks like a fun challenge, and kudos to Tersius if he managed to hack it right; but it's way too risky for production, at least for me. I really laughed when I read Diana's post, talk about Plagiarism! haha.gabrielmaldi
@Noseratio, I haven't lost interest at all; in fact, I'm about to start a project in which we have to call existing WCF services which rely on data passed through OperationContextScope, and I'd really like to take advantage of async/await on the Web API client. Thanks for the update, I read (and favorited) your post and everything you linked. If Stephen's right I guess I'm not going to be forced to abandon asynchrony in this new project (unless we rewrite the services so they don't depend on the context; this should be standard practice for any new project). I'll keep myself posted.gabrielmaldi

8 Answers

37
votes

I think a feasible solution might be to use a custom awaiter to flow the new operation context via OperationContext.Current. The implementation of OperationContext itself doesn't appear to require thread affinity. Here is the pattern:

async Task TestAsync()
{
    using(var client = new WcfAPM.ServiceClient())
    using (var scope = new FlowingOperationContextScope(client.InnerChannel))
    {
        await client.SomeMethodAsync(1).ContinueOnScope(scope);
        await client.AnotherMethodAsync(2).ContinueOnScope(scope);
    }
}

Here is the implementation of FlowingOperationContextScope and ContinueOnScope (only slightly tested):

public sealed class FlowingOperationContextScope : IDisposable
{
    bool _inflight = false;
    bool _disposed;
    OperationContext _thisContext = null;
    OperationContext _originalContext = null;

    public FlowingOperationContextScope(IContextChannel channel):
        this(new OperationContext(channel))
    {
    }

    public FlowingOperationContextScope(OperationContext context)
    {
        _originalContext = OperationContext.Current;
        OperationContext.Current = _thisContext = context;
    }

    public void Dispose()
    {
        if (!_disposed)
        {
            if (_inflight || OperationContext.Current != _thisContext)
                throw new InvalidOperationException();
            _disposed = true;
            OperationContext.Current = _originalContext;
            _thisContext = null;
            _originalContext = null;
        }
    }

    internal void BeforeAwait()
    {
        if (_inflight)
            return;
        _inflight = true;
        // leave _thisContext as the current context
   }

    internal void AfterAwait()
    {
        if (!_inflight)
            throw new InvalidOperationException();
        _inflight = false;
        // ignore the current context, restore _thisContext
        OperationContext.Current = _thisContext;
    }
}

// ContinueOnScope extension
public static class TaskExt
{
    public static SimpleAwaiter<TResult> ContinueOnScope<TResult>(this Task<TResult> @this, FlowingOperationContextScope scope)
    {
        return new SimpleAwaiter<TResult>(@this, scope.BeforeAwait, scope.AfterAwait);
    }

    // awaiter
    public class SimpleAwaiter<TResult> :
        System.Runtime.CompilerServices.INotifyCompletion
    {
        readonly Task<TResult> _task;

        readonly Action _beforeAwait;
        readonly Action _afterAwait;

        public SimpleAwaiter(Task<TResult> task, Action beforeAwait, Action afterAwait)
        {
            _task = task;
            _beforeAwait = beforeAwait;
            _afterAwait = afterAwait;
        }

        public SimpleAwaiter<TResult> GetAwaiter()
        {
            return this;
        }

        public bool IsCompleted
        {
            get 
            {
                // don't do anything if the task completed synchronously
                // (we're on the same thread)
                if (_task.IsCompleted)
                    return true;
                _beforeAwait();
                return false;
            }

        }

        public TResult GetResult()
        {
            return _task.Result;
        }

        // INotifyCompletion
        public void OnCompleted(Action continuation)
        {
            _task.ContinueWith(task =>
            {
                _afterAwait();
                continuation();
            },
            CancellationToken.None,
            TaskContinuationOptions.ExecuteSynchronously,
            SynchronizationContext.Current != null ?
                TaskScheduler.FromCurrentSynchronizationContext() :
                TaskScheduler.Current);
        }
    }
}
10
votes

Simple way is to move the await outside the using block

public Task<Document> GetDocumentAsync(string docId)
{
    var docClient = CreateDocumentServiceClient();
    using (new OperationContextScope(docClient.InnerChannel))
    {
        var task = docClient.GetDocumentAsync(docId);
    }
    return await task;
}
2
votes

I decide to write my own code that helps with this, posting in case this helps anyone. Seems to be a little less to go wrong (unforeseen races etc) vs the SimpleAwaiter implementation above but you be the judge:

public static class WithOperationContextTaskExtensions
{
    public static ContinueOnOperationContextAwaiter<TResult> WithOperationContext<TResult>(this Task<TResult> @this, bool configureAwait = true)
    {
        return new ContinueOnOperationContextAwaiter<TResult>(@this, configureAwait);
    }

    public static ContinueOnOperationContextAwaiter WithOperationContext(this Task @this, bool configureAwait = true)
    {
        return new ContinueOnOperationContextAwaiter(@this, configureAwait);
    }

    public class ContinueOnOperationContextAwaiter : INotifyCompletion
    {
        private readonly ConfiguredTaskAwaitable.ConfiguredTaskAwaiter _awaiter;
        private OperationContext _operationContext;

        public ContinueOnOperationContextAwaiter(Task task, bool continueOnCapturedContext = true)
        {
            if (task == null) throw new ArgumentNullException("task");

            _awaiter = task.ConfigureAwait(continueOnCapturedContext).GetAwaiter();
        }

        public ContinueOnOperationContextAwaiter GetAwaiter() { return this; }

        public bool IsCompleted { get { return _awaiter.IsCompleted; } }

        public void OnCompleted(Action continuation)
        {
            _operationContext = OperationContext.Current;
            _awaiter.OnCompleted(continuation);
        }

        public void GetResult()
        {
            OperationContext.Current = _operationContext;
            _awaiter.GetResult();
        }
    }

    public class ContinueOnOperationContextAwaiter<TResult> : INotifyCompletion
    {
        private readonly ConfiguredTaskAwaitable<TResult>.ConfiguredTaskAwaiter _awaiter;
        private OperationContext _operationContext;

        public ContinueOnOperationContextAwaiter(Task<TResult> task, bool continueOnCapturedContext = true)
        {
            if (task == null) throw new ArgumentNullException("task");

            _awaiter = task.ConfigureAwait(continueOnCapturedContext).GetAwaiter();
        }

        public ContinueOnOperationContextAwaiter<TResult> GetAwaiter() { return this; }

        public bool IsCompleted { get { return _awaiter.IsCompleted; } }

        public void OnCompleted(Action continuation)
        {
            _operationContext = OperationContext.Current;
            _awaiter.OnCompleted(continuation);
        }

        public TResult GetResult()
        {
            OperationContext.Current = _operationContext;
            return _awaiter.GetResult();
        }
    }
}

Usage (a little manual and nesting is untested...):

    /// <summary>
    /// Make a call to the service
    /// </summary>
    /// <param name="action"></param>
    /// <param name="endpoint"> </param>
    public async Task<ResultCallWrapper<TResult>> CallAsync<TResult>(Func<T, Task<TResult>> action, EndpointAddress endpoint)
    {
        using (ChannelLifetime<T> channelLifetime = new ChannelLifetime<T>(ConstructChannel(endpoint)))
        {
            // OperationContextScope doesn't work with async/await
            var oldContext = OperationContext.Current;
            OperationContext.Current = new OperationContext((IContextChannel)channelLifetime.Channel);

            var result = await action(channelLifetime.Channel)
                .WithOperationContext(configureAwait: false);

            HttpResponseMessageProperty incomingMessageProperty = (HttpResponseMessageProperty)OperationContext.Current.IncomingMessageProperties[HttpResponseMessageProperty.Name];

            string[] keys = incomingMessageProperty.Headers.AllKeys;
            var headersOrig = keys.ToDictionary(t => t, t => incomingMessageProperty.Headers[t]);

            OperationContext.Current = oldContext;

            return new ResultCallWrapper<TResult>(result, new ReadOnlyDictionary<string, string>(headersOrig));
        }
    }
2
votes

Async flow is supported from .Net 4.6.2.

We have an ASP.Net WebApi application running on .Net 4.6 where we used the accepted answer. TaskScheduler.FromCurrentSynchronizationContext() caused deadlock issues when the current synchronization context is AspNetSynchronizationContext.

I believe the continuation task was queued after the actual task, causing the actual task is waiting on the continuation whereas the continuation task must run to complete the actual task. i.e. tasks are both waiting on each other.

So I fixed the issue by changing using continuation task to use TaskAwaiter. See: https://blogs.msdn.microsoft.com/lucian/2012/12/11/how-to-write-a-custom-awaiter/

1
votes

It's been a while on this one, but I'll chime in with my own home-baked solution.

If one doesn't mind doing without OperationContextScope, one might consider something along these lines:

Extension methods

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Security;
using System.Text;
using System.Threading.Tasks;

namespace Intexx.ServiceModel
{
    public static class WcfExtensions
    {
        [DebuggerStepThrough]
        public static void Call<TChannel>(this TChannel Client, Action<TChannel> Method) where TChannel : ICommunicationObject
        {
            try
            {
                Method.Invoke(Client);
            }
            finally
            {
                Cleanup(Client);
            }
        }

        [DebuggerStepThrough]
        public static TResult Call<TChannel, TResult>(this TChannel Client, Func<TChannel, TResult> Method) where TChannel : ICommunicationObject
        {
            try
            {
                return Method.Invoke(Client);
            }
            finally
            {
                Cleanup(Client);
            }
        }

        [DebuggerStepThrough]
        public async static Task CallAsync<TChannel>(this TChannel Client, Func<TChannel, Task> Method) where TChannel : ICommunicationObject
        {
            try
            {
                await Method.Invoke(Client);
            }
            finally
            {
                Cleanup(Client);
            }
        }

        [DebuggerStepThrough]
        public async static Task<TResult> CallAsync<TChannel, TResult>(this TChannel Client, Func<TChannel, Task<TResult>> Method) where TChannel : ICommunicationObject
        {
            try
            {
                return await Method.Invoke(Client);
            }
            finally
            {
                Cleanup(Client);
            }
        }

        private static void Cleanup<TChannel>(TChannel Client) where TChannel : ICommunicationObject
        {
            try
            {
                if (Client.IsNotNull)
                {
                    if (Client.State == CommunicationState.Faulted)
                        Client.Abort();
                    else
                        Client.Close();
                }
            }
            catch (Exception ex)
            {
                Client.Abort();

                if (!ex is CommunicationException && !ex is TimeoutException)
                    throw new Exception(ex.Message, ex);
            }

            finally
            {
                Client = null;
            }
        }
    }
}

Client class

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Security;
using System.Text;
using System.Threading.Tasks;

namespace Reader
{
    public class Client
    {
        public static CemReaderClient Create()
        {
            Tuple<Channels.Binding, EndpointAddress, double> oService;

            try
            {
                oService = Main.Services(typeof(ICemReader));
                return new CemReaderClient(oService.Item1, oService.Item2);
            }
            catch (KeyNotFoundException ex)
            {
                return null;
            }
        }
    }
}

Usage (in VB, as the code wouldn't convert)

Using oReader As Reader.CemReaderClient = Reader.Client.Create
  If oReader.IsNotNothing Then
    Dim lIsReading = Await oReader.CallAsync(Function(Reader As Reader.CemReaderClient)
                                               Me.ConfigFilePath = If(Me.ConfigFilePath, Reader.GetConfigFilePath)
                                               Me.BackupDrive = If(Me.BackupDrive, Reader.GetBackupDrive)
                                               Me.SerialPort = If(Me.SerialPort, Reader.GetSerialPort)
                                               Me.LogFolder = If(Me.LogFolder, Reader.GetLogFolder)

                                               Return Reader.GetIsReadingAsync
                                             End Function)
  End If
End Using

I've had this running reliably in production under frequency loads of around 15 calls/sec on the client side (that's as fast as serial processing would allow). That was on a single thread, though—this hasn't been rigorously tested for thread safety. YMMV.

In my case, I decided to roll the extension methods into their own private NuGet package. The whole construct has turned out to be pretty handy.

This will have to be reevaluated, of course, if OperationContextScope ever ends up being needed.

The bit with the Tuple in the Client class is for Service Discovery support. If anyone would like to see that code as well, give a shout and I'll update my answer.

0
votes

I am a little bit confused, I found this Blog : Task-based asynchronous operation in WCF

There this is a async wcf communication:

[ServiceContract]
public interface IMessage
{
    [OperationContract]
    Task<string> GetMessages(string msg);
}

public class MessageService : IMessage
{
   async Task<string> IMessage.GetMessages(string msg)
   {
      var task = Task.Factory.StartNew(() =>
                                     {
                                         Thread.Sleep(10000);
                                         return "Return from Server : " + msg;
                                     });
     return await task.ConfigureAwait(false);
   }
}

Client:

var client = new Proxy("BasicHttpBinding_IMessage");
       var task = Task.Factory.StartNew(() => client.GetMessages("Hello"));
       var str = await task;

So is this also a good way??

-1
votes

I ran into the same issue, however it dawned on me that I didn't need to use async/await at all.

Since you are not post processing the result, there is no need to wait for the reply. If you do need to process the result, just use the old fashion TPL continuation.

public Task<MyDomainModel> GetHomeInfoAsync(DateTime timestamp)
{
    using (var helper = new ServiceHelper<ServiceClient, ServiceContract>())
    {
        return helper.Proxy.GetHomeInfoAsync(timestamp).ContinueWith(antecedent=>processReplay(antecedent.Result));
    }
}
-1
votes

I don't know if this helps, but after seeing this question on my search to answer the same question, I came upon this.

Leading from that, I should think your code should look something like this:

public async Task<HomeInfo> GetHomeInfoAsync(DateTime timestamp)
{
    using (var client = CreateDocumentServiceClient())
    {
        await client.BeginGetHomeInfoAsync(timestamp);
    }
}

I realise my answer comes rather late :P but it might help someone else.