0
votes

I have an Enterprise Service Bus implemented with using Masstransit / RabbitMq to use in a web project. I do RPC over MQ using Request/Response pattern of Masstransit.

I am creating multiple ReceiveEndpoint in a Bus which handles different type of messages. ESB uses a custom configuration file to create multiple busses as well so i can somehow implement a Qos logically and even distrubute consumers over network on a seperate servers to increase the performance more or less.

Recently i see blocking on one of my consumer. If i send some messages in to a long running consumer (each takes about 5 seconds of work) it looks like a single thread responded back my request and concurrently sent messages waits the previous messages to be consumed.

Up to now I set UseConcurrencyLimit(64) which changed nothing, tried to increase PrefetchCount to 50 however RabbitMq shows it 0 in the queue details.

Why the consumers process single message at a time?

MassTransit v3.5.7

Edit: I found this thread later. It looks like same issue to me.

UPDATE What is different from Chris's sample, i use RabbitMq and using reflection to create consumer so that way i can manage the ESB with a configuration file. Still seing Prefetch Count 0 on RabbitMq manament console.

var busControl = Bus.Factory.CreateUsingRabbitMq(x =>
        {
            x.UseConcurrencyLimit(64);
            IRabbitMqHost host = x.Host(new Uri(Config.host), h =>
            {
                h.Username("userName");
                h.Password("password");
            });

            var obj = Activator.CreateInstance([SomeExistingType]);

            x.ReceiveEndpoint(host, "queueName", e =>
            {
                e.PrefetchCount = 64;//config.prefetchCount;
                e.PurgeOnStartup = true;
                if (config.retryCount > 0)
                {
                    e.UseRetry(retry => retry.Interval(config.retryCount, config.retryInterval));
                }
                e.SetQueueArgument("x-expires", config.timeout * 1000 /*Seconds to milliseconds*/);
                e.SetQueueArgument("temporary", config.temporary);

                e.Consumer(consumer, f => obj);

            }); 

        })

busControl.StartAsync();

UPDATE 2 While i was setting prefetch count to 1 to let the MassTransit handles the workload since i have several consumer servers and a RabbitMq cluster. However, when i send many messages to the queue that kept busy all the threads, new requests waited in the sender queue to be picked up by a free thread. I increased the prefetch count that way i end up with more free thread for the new requests. I set the prefetch count while configuring the receive endpoint as Chris advise me. Thanks. I will mark the Chris reply as answer as soon as Chris confirms.

1
In your example, you are using a single consumer for all messages. Typically not the best choice, but if you stick with it make sure your consumer doesn't have any instance variables as they will be shared by all concurrent messages. - Chris Patterson
You could fix that by moving the Activator.CreateInstance call into the lambda method of the e.Consumer() call. - Chris Patterson
@ChrisPatterson thanks a lot, i read that critic previously from your responses. In the example i felt like i need to modify the way to be more simplistic for this specific question. In reality, I create many ReceiveEnpoints for each message type. Besides i create several buses to group some messages together since some of them uses request/response pattern while the others for publish/subscribe. - Emrah GOZCU

1 Answers

1
votes

If you're seeing 0 in the RabbitMQ console for prefetch count, you may be configuring it in the wrong place. You should have something along the lines of:

cfg.ReceiveEndpoint("my-queue", x =>
{
    x.PrefetchCount = 64;
    x.Consumer<MyConsumer>();
});

This will configure the receive endpoint consumer to a prefetch count of 64, which should show up in the consumer on the RabbitMQ management console.

To the other point, if your consumer is blocking the thread using something like Thread.Sleep(), I've seen cases where that can limit the concurrency of the thread pool.