0
votes

Using MassTransit with Azure Service Bus as the transport. I'm forced to always set PrefetchCount to 1 or else the consumer will pull the same message off the queue multiple times, at which point I get a message lock exception. Why would my consumer pull off the same message and try to process it multiple times? How can I avoid this behaviour to allow my consumer to process multiple messages (just not the same message) concurrently?

2020-01-03 10:30:31 ERR Exception on Receiver sb://**queueuri**/create-access-points during "RenewLock"
Microsoft.Azure.ServiceBus.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. Reference:2e0c6c96-a0e1-40e2-9f95-1930aad9744b, TrackingId:37d9c13c-5bb4-4c19-9853-21f9ef06c7dd_B31, SystemTracker:qesi-local-queue:Queue:create-access-points, Timestamp:2020-01-03T16:31:08

Forced to set PrefetchCount in my Consumer

public class CreateAccessPointCommandHandlerDef : ConsumerDefinition<CreateAccessPointCommandHandler>
{
    public CreateAccessPointCommandHandlerDef()
    {
        EndpointName = "create-access-points";
        ConcurrentMessageLimit = 4;
    }

    protected override void ConfigureConsumer(
        IReceiveEndpointConfigurator endpointConfigurator,
        IConsumerConfigurator<CreateAccessPointCommandHandler> consumerConfigurator
    )
    {
        if (endpointConfigurator is IServiceBusEndpointConfigurator)
        {
            (endpointConfigurator as IServiceBusEndpointConfigurator).LockDuration = TimeSpan.FromMinutes(5);
            (endpointConfigurator as IServiceBusEndpointConfigurator).MaxAutoRenewDuration = TimeSpan.FromMinutes(30);
            (endpointConfigurator as IServiceBusEndpointConfigurator).PrefetchCount = 1;
        }
    }

Here is my logs showing I get the lock exception just shortly after the message is received. The process eventually completes successfully and then it tries to process the same message again.

2020-01-06 10:19:38 DBG Received CreateAccessPoints message (ID: 14020000-5d08-0015-ac23-08d792c43254 for ProviderNetworkId 17 and ProviderNetworkVersionId 1014.
info: Worker[0] Worker running at: 01/06/2020 10:19:52 -06:00
2020-01-06 10:19:52 INF Worker running at: 01/06/2020 10:19:52 -06:00
info: Worker[0] Worker running at: 01/06/2020 10:20:12 -06:00
2020-01-06 10:20:13 INF Worker running at: 01/06/2020 10:20:12 -06:00
fail: MassTransit[0]
  Exception on Receiver sb://**azuresb** during RenewLock
Microsoft.Azure.ServiceBus.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. Reference:9470f437-5c5f-4280-aa32-0f1d97ac7c50, TrackingId:89f135d1-69ce-4c08-b80d-247d3c17eb76_B1, SystemTracker:**azuresb**:Queue:create-access-points, Timestamp:2020-01

Here's the log from my startup, does any of this seem wrong?

2020-01-06 10:43:05 DBG Topic: "qesi.Core.ProviderNetwork.Commands/CreateAccessPointsCommand" ("")
2020-01-06 10:43:05 DBG Queue: "create-access-points" ("auto-delete: 29247y1M2w2h48m5s477ms")
2020-01-06 10:43:05 DBG Subscription "create-access-points" ("Commands/CreateAccessPointsCommand" -> "sb://**azuresb**/create-access-points")
2020-01-06 10:43:05 DBG Creating message receiver for sb://qesi-local-queue.servicebus.windows.net/create-access-points
2020-01-06 10:43:06 DBG Creating queue "QA1271_****_bus_nobyyyn7byybmbdabdm3ft598r"
2020-01-06 10:43:07 DBG Queue: "QA1271_****_bus_nobyyyn7byybmbdabdm3ft598r" ("dead letter, auto-delete: 5m")

And finally here is my Bus creation code:

        return Bus.Factory.CreateUsingAzureServiceBus(cfg =>
        {
            cfg.Host(_config.ServiceBusUri, host => {
                host.SharedAccessSignature(s =>
                {
                    s.KeyName = _config.KeyName;
                    s.SharedAccessKey = _config.SharedAccessKey;
                    s.TokenTimeToLive = TimeSpan.FromDays(1);
                    s.TokenScope = TokenScope.Namespace;
                });
            });
            cfg.SetLoggerFactory(_logFactory);
            cfg.ConfigureEndpoints(_provider);
            cfg.UseMessageData(_messageDataRepository);
            cfg.UseMessageRetry(retry => retry.Immediate(2));
        });
2
I'm still getting the lock error even with prefetch count set to 1.. I'm not sure why I keep getting this lock error and messages are consumed multiple timesRyan Langton
Can you update the question with the final result, there was a fix to MassTransit to throw exceptions back to Azure Functions, and a couple of other changes on the timeouts if I recall.Chris Patterson

2 Answers

1
votes

You're confusing several things here that are not related to how MassTransit works. Azure Service Bus delivers messages to the consumers. A message is locked for a certain duration. If a message is not completed, dead-lettered, or cancelled in the given period of time, it will lose its lock and another competing consumer will have it for reprocessing. If a message is losing its lock multiple times, specifically MaxDeliveryCount times, the message will be dead-lettered automatically by the broker. Since you're assigning 30 minutes to MaxAutoRenewDuration, MassTransit will attempt to renew the lock and extend the processing time up to 30 minutes. Beyond that, it will be the same thing - lock will be lost and the message will be given to another competing consumer. Check how long your message processing is taking.

PrefetchCount is a different setting. It means "how many messages are pre-fetched so that the client doesn't have to go explicitly bring messages every time. In your case, that's one additional message that is prefetched whenever a message is requested/received by MassTransit. This setting is improving the overall endpoint throughput, not the concurrency. Which you set to 4.

0
votes

Turns out this issue was being caused because we were running our .NET Core Generic Host inside of a Continuous WebJob. There seems to be connectivity issues to Azure Service Bus when running in the context of the WebJob. It'll work most of the time then it will blow up, losing Azure Service Bus locks. This took us a lot of trial and error to track down. As soon as we deployed the Generic Host in a Docker container, all of our service bus lock problems went away.