I'm using masstransit and rabbitmq. My queue has messages of various types delivered to it. Most of these types implement IHaveOrganisationKey. I would like to add a partitioner to the pipeline to ensure that only one message (of any type) with a given organisationkey is processed at the same time. The goal is to limit concurrency problems that occur when multiple messages of the same organisation are processed in parallel, while allowing parallel processing of messages from different organisations.
Configuration code
sbc.ReceiveEndpoint(host, this.queueConfiguration.QueueName, ep =>
{
ep.PrefetchCount = this.busConfiguration.PrefetchCount;
this.queueConfiguration.ConfigureEndpoint(ep);
this.queueConfiguration.SubscribeMessages(worker, ep);
});
In the QueueConfiguration:
public override void ConfigureEndpoint(IRabbitMqReceiveEndpointConfigurator ep)
{
// This is incomplete. Am I on the right track here?
ep.UsePartitioner(1, x => x.TryGetMessage<IHaveOrganisationKey>());
}