1
votes

At work we have some code in a Azure WebJob where we use Rabbit

The basic workflow is this

  • A message arrives on RabbitMQ Queue
  • We have a message handler for the incoming message
  • Within the message handler we start a top level (user) supervisor actor where we "ask" it to handle the message

The supervisor actor hierarchy is like this

enter image description here

And the relevant top level code is something like this (this is the WebJob code)

static void Main(string[] args)
{
    try
    {
        //Bootstrap akka IoC resolver well ahead of any actor usages
        new AutoFacDependencyResolver(ContainerOperations.Instance.Container, ContainerOperations.Instance.Container.Resolve<ActorSystem>());

        var system = ContainerOperations.Instance.Container.Resolve<ActorSystem>();

        var busQueueReader = ContainerOperations.Instance.Container.Resolve<IBusQueueReader>();
        var dateTime = ContainerOperations.Instance.Container.Resolve<IDateTime>();
        busQueueReader.AddHandler<ProgramCalculationMessage>("RabbitQueue", x =>
        {

            //This is code that gets called whenever we have a RabbitMQ message arrive
            //This is code that gets called whenever we have a RabbitMQ message arrive
            //This is code that gets called whenever we have a RabbitMQ message arrive
            //This is code that gets called whenever we have a RabbitMQ message arrive
            //This is code that gets called whenever we have a RabbitMQ message arrive

            try
            {
                //SupervisorActor is a singleton
                var supervisorActor = ContainerOperations.Instance.Container.ResolveNamed<IActorRef>("SupervisorActor");
                var actorMessage = new SomeActorMessage();
                var supervisorRunTask = runModelSupervisorActor.Ask(actorMessage, TimeSpan.FromMinutes(25));

                //we want to wait this guy out
                var supervisorRunResult = supervisorRunTask.GetAwaiter().GetResult();
                switch (supervisorRunResult)
                {
                    case CompletedEvent completed:
                    {
                        break;
                    }
                    case FailedEvent failed:
                    {
                        throw failed.Exception;
                    }

                }
            }
            catch (Exception ex)
            {
                _log.Error(ex, "Error found in Webjob");
                //throw it for the actual RabbitMqQueueReader Handler so message gets NACK
                throw;
            }

        });
        Thread.Sleep(Timeout.Infinite);
    }
    catch (Exception ex)
    {
        _log.Error(ex, "Error found");
        throw;
    }
}

And this is the relevant IOC code (we are using Autofac + Akka.NET DI for Autofac)

builder.RegisterType<SupervisorActor>();

_actorSystem = new Lazy<ActorSystem>(() =>
{
    var akkaconf = ActorUtil.LoadConfig(_akkaConfigPath).WithFallback(ConfigurationFactory.Default());
    return ActorSystem.Create("WebJobSystem", akkaconf);
});

builder.Register<ActorSystem>(cont => _actorSystem.Value);

builder.Register(cont =>
                    {
                        var system = cont.Resolve<ActorSystem>();
                        return system.ActorOf(system.DI().Props<SupervisorActor>(),"SupervisorActor");
                    })
        .SingleInstance()
        .Named<IActorRef>("SupervisorActor");

The problem

So the code is working fine and doing what we want it to, apart from the Akka.Net "ask" timeout shown above in the WebJob code.

Annoyingly this seems to work fine if I try and run the webjob locally. Where I can simulate a "ask" timeout by providing a new supervisorActor that simply doesn't EVER respond with a message back to the "Sender".

This works perfectly running on my machine, but when we run this code in Azure, we DO NOT see a Timeout for the "ask" even though one of our workflow runs exceeded the "ask" timeout by a mile.

I just don't know what could be causing this behavior, does anyone have any ideas?

Could there be some Azure specific config value for the WebJob that I need to set.

1
What version of Akka.Net are you using? Also, what happens if you change your code to await the ask or use ContinueWith() on the ask instead of GetAwaiter().GetResult()?Nicholas Reynolds
Sadly can't use continuewith or await as this is inside rabbit cosumer code which is not async await, ok I code wedge it in but it's not really an async await client api the c# rabbit official onesacha barber
@NicholasReynolds going to try this (which I had missed) gigi.nullneuron.net/gigilabs/…sacha barber
You could probably take a look at Akka.Streams.Amqp connector and then make use of Akka.Streams as integration point.Bartosz Sypytkowski
I've used Akka streams (nice) but the async rabbit api I added in link seems to have done the tricksacha barber

1 Answers

0
votes

The answer to this was to use the async rabbit handlers which apparently came out in V5.0 of the C# rabbit client. The offical docs still show the sync usage (sadly).

This article is quite good : https://gigi.nullneuron.net/gigilabs/asynchronous-rabbitmq-consumers-in-net/

Once we did this, all was good