I have an Enterprise Service Bus implemented with using Masstransit / RabbitMq to use in a web project. I do RPC over MQ using Request/Response pattern of Masstransit.
I am creating multiple ReceiveEndpoint in a Bus which handles different type of messages. ESB uses a custom configuration file to create multiple busses as well so i can somehow implement a Qos logically and even distrubute consumers over network on a seperate servers to increase the performance more or less.
Recently i see blocking on one of my consumer. If i send some messages in to a long running consumer (each takes about 5 seconds of work) it looks like a single thread responded back my request and concurrently sent messages waits the previous messages to be consumed.
Up to now I set UseConcurrencyLimit(64) which changed nothing, tried to increase PrefetchCount to 50 however RabbitMq shows it 0 in the queue details.
Why the consumers process single message at a time?
MassTransit v3.5.7
Edit: I found this thread later. It looks like same issue to me.
UPDATE What is different from Chris's sample, i use RabbitMq and using reflection to create consumer so that way i can manage the ESB with a configuration file. Still seing Prefetch Count 0 on RabbitMq manament console.
var busControl = Bus.Factory.CreateUsingRabbitMq(x =>
{
x.UseConcurrencyLimit(64);
IRabbitMqHost host = x.Host(new Uri(Config.host), h =>
{
h.Username("userName");
h.Password("password");
});
var obj = Activator.CreateInstance([SomeExistingType]);
x.ReceiveEndpoint(host, "queueName", e =>
{
e.PrefetchCount = 64;//config.prefetchCount;
e.PurgeOnStartup = true;
if (config.retryCount > 0)
{
e.UseRetry(retry => retry.Interval(config.retryCount, config.retryInterval));
}
e.SetQueueArgument("x-expires", config.timeout * 1000 /*Seconds to milliseconds*/);
e.SetQueueArgument("temporary", config.temporary);
e.Consumer(consumer, f => obj);
});
})
busControl.StartAsync();
UPDATE 2 While i was setting prefetch count to 1 to let the MassTransit handles the workload since i have several consumer servers and a RabbitMq cluster. However, when i send many messages to the queue that kept busy all the threads, new requests waited in the sender queue to be picked up by a free thread. I increased the prefetch count that way i end up with more free thread for the new requests. I set the prefetch count while configuring the receive endpoint as Chris advise me. Thanks. I will mark the Chris reply as answer as soon as Chris confirms.
Activator.CreateInstancecall into the lambda method of thee.Consumer()call. - Chris Patterson