4
votes

I'm using masstransit and rabbitmq. My queue has messages of various types delivered to it. Most of these types implement IHaveOrganisationKey. I would like to add a partitioner to the pipeline to ensure that only one message (of any type) with a given organisationkey is processed at the same time. The goal is to limit concurrency problems that occur when multiple messages of the same organisation are processed in parallel, while allowing parallel processing of messages from different organisations.

Configuration code

sbc.ReceiveEndpoint(host, this.queueConfiguration.QueueName, ep =>
{
    ep.PrefetchCount = this.busConfiguration.PrefetchCount;
    this.queueConfiguration.ConfigureEndpoint(ep);
    this.queueConfiguration.SubscribeMessages(worker, ep);
});

In the QueueConfiguration:

public override void ConfigureEndpoint(IRabbitMqReceiveEndpointConfigurator ep)
{
    // This is incomplete. Am I on the right track here?
    ep.UsePartitioner(1, x => x.TryGetMessage<IHaveOrganisationKey>());
}
1

1 Answers

5
votes

In this case, you need to create the partitioner separately, and then pass it to each of the message type configurations.

var p = ep.CreatePartitioner(8);

ep.Consumer<ConsumerA>(x => x.Message<A>(m => m.UsePartitioner(p, c => c.Message.OrgKey)));
ep.Consumer<ConsumerB>(x => x.Message<B>(m => m.UsePartitioner(p, c => c.Message.OrgKey)));

If your consumer has multiple message types, you can specify multiple Message statements in the configuration.