1
votes

I am trying to use MassTransit for Request/Response communication through Azure service bus queue. Sender is an Azure WebApp, Consumer is a windows service installed at on-premise machine.

Everything works fine when it is about small volumes of messages. However as soon as I start sending more than ~20 msg/sec i see severe(1-2 sec) delays in responses from consumer. My telemetry tells me that delay is happening at point when consumer needs to grab messages from queue.

One strange, but I think important part of behavior: I can see that with current load amount of unread messages in queue is on avg constant and its 25. If I send 2x more messages, than I see on avg 50messages in queue. With delays on consumption side i would expect queue to GROW, but it is constant, so it is definitely something inside code that throttles the connection.

Quick info:

  • There are no problems with hardware on the machine. CPU/Mem not high.
  • I tried playing with the UseConcurrencyLimit, MaxConcurrentCalls, PrefetchCount configs on consuner side. It did not help
  • My solution code of sender and consumer are next to classic examples.

Consumer: .Net framework 4.7.2 and MassTransit.Azure.ServiceBus.Core 5.5.2

Here's my listener class with all business logic removed:

public class QueueListener
{
    private IBusControl Bus { get; set; }

    public QueueListener()
    {
        Bus = MassTransit.Bus.Factory.CreateUsingAzureServiceBus(serviceBusFactoryConfigurator =>
        {
            var host = serviceBusFactoryConfigurator.Host(SettingsHelper.AzureServiceBusConnectionString,
                (config) =>
                {
                    config.OperationTimeout = TimeSpan.FromSeconds(60);
                    config.TransportType = TransportType.AmqpWebSockets;
                });
            serviceBusFactoryConfigurator.ReceiveEndpoint(host, SettingsHelper.CouponQueryQueueName, e =>
            {
                e.Handler<JToken>(HandleMessage);

                e.UseConcurrencyLimit(16);
                e.MaxConcurrentCalls = 16;
                e.PrefetchCount = 32;
            });
            serviceBusFactoryConfigurator.EnableBatchedOperations = true;
            serviceBusFactoryConfigurator.DefaultMessageTimeToLive = TimeSpan.FromSeconds(60);
        });
    }

    private async Task HandleMessage(ConsumeContext context)
    {
        await Task.Delay(800);
        if (context.ExpirationTime > SystemDateTime.Now)
        {
            await context.RespondAsync(new CouponUsedList { CouponsUsed = new List<CouponCurrentUsed>() });
        }

    }

    public Task LaunchAsync()
    {
        return Bus.StartAsync();
    }

    public Task StopAsync()
    {
        return Bus.StopAsync();
    }
}
1
You can benchmark your environment using github.com/MassTransit/MassTransit-Benchmark - might give you some ideas. Also, if you post your configuration that would help to give you feedback. You shouldn't need any filters, an only need to use MaxConcurrentCalls and PrefetchCount on the receive endpoint to tune performance with ASB. - Chris Patterson
@ChrisPatterson, How do I run the benchmark? I have started it - it created a queue and pushed 10K messages to it. Nothing follows that :) some error? - Alexander Capone
@ChrisPatterson I have updated my question with listener's code. Please look. Notice, i have added Delay of 800ms to simulate my async dependency calls that will be happening - Alexander Capone
So, math, 0.8 seconds times 16 concurrent is roughly 20 messages a second. So what you're seeing is what you will get unless you increase the MaxConcurrentCalls. You can also remove the UseConcurrencyLimit, it isn't needed. - Chris Patterson
@ChrisPatterson, turned out i was lacking one configurator on consumer side. By the way, how do I lauch benchmark? I was not able to do it - Alexander Capone

1 Answers

0
votes

Seems that here, once again, it was just missing one config. All code that you write inside ReceiveEndpoint configurates the consumer listener queue and configurations that you provide in CreateUsingAzureServiceBus are configurations for a consumer response queue.

All I needed was to add one line inside consumer configuration. Without this config all prefetched messages are handled gradually

e.EnableBatchedOperations = true;