I am trying to use MassTransit for Request/Response communication through Azure service bus queue. Sender is an Azure WebApp, Consumer is a windows service installed at on-premise machine.
Everything works fine when it is about small volumes of messages. However as soon as I start sending more than ~20 msg/sec i see severe(1-2 sec) delays in responses from consumer. My telemetry tells me that delay is happening at point when consumer needs to grab messages from queue.
One strange, but I think important part of behavior: I can see that with current load amount of unread messages in queue is on avg constant and its 25. If I send 2x more messages, than I see on avg 50messages in queue. With delays on consumption side i would expect queue to GROW, but it is constant, so it is definitely something inside code that throttles the connection.
Quick info:
- There are no problems with hardware on the machine. CPU/Mem not high.
- I tried playing with the UseConcurrencyLimit, MaxConcurrentCalls, PrefetchCount configs on consuner side. It did not help
- My solution code of sender and consumer are next to classic examples.
Consumer: .Net framework 4.7.2 and MassTransit.Azure.ServiceBus.Core 5.5.2
Here's my listener class with all business logic removed:
public class QueueListener
{
private IBusControl Bus { get; set; }
public QueueListener()
{
Bus = MassTransit.Bus.Factory.CreateUsingAzureServiceBus(serviceBusFactoryConfigurator =>
{
var host = serviceBusFactoryConfigurator.Host(SettingsHelper.AzureServiceBusConnectionString,
(config) =>
{
config.OperationTimeout = TimeSpan.FromSeconds(60);
config.TransportType = TransportType.AmqpWebSockets;
});
serviceBusFactoryConfigurator.ReceiveEndpoint(host, SettingsHelper.CouponQueryQueueName, e =>
{
e.Handler<JToken>(HandleMessage);
e.UseConcurrencyLimit(16);
e.MaxConcurrentCalls = 16;
e.PrefetchCount = 32;
});
serviceBusFactoryConfigurator.EnableBatchedOperations = true;
serviceBusFactoryConfigurator.DefaultMessageTimeToLive = TimeSpan.FromSeconds(60);
});
}
private async Task HandleMessage(ConsumeContext context)
{
await Task.Delay(800);
if (context.ExpirationTime > SystemDateTime.Now)
{
await context.RespondAsync(new CouponUsedList { CouponsUsed = new List<CouponCurrentUsed>() });
}
}
public Task LaunchAsync()
{
return Bus.StartAsync();
}
public Task StopAsync()
{
return Bus.StopAsync();
}
}