I am attempting to use Azure Service Bus Topics to allow services to consume messages using MassTransit. A message is posted to a topic and each service that has a subscription to the topic receives a copy of the message. Each message type gets a topic.
The problem is how to deal with errors once the source of the exception is addressed. Once the message faults and ends up in the _error queue, I would like to move the message back to be processed after the message and/or service is fixed. I cannot move the message from the _error queue to the Topic because every service on that topic would get the message again.
I attempted to create a second queue using the ReceiveEndpoint method called _errorecovery but doing so caused the queue to subscribe to the topic, meaning the _errorrecovery queue gets every message published to that topic.
I am wondering if there is a way that I can set up a queue using MassTransit that will only process messages in that queue without adding the additional subscription.
Here is my current setup to build the topics.
TEvent is the message type and TConsumer is the associated IConsumer implementation for that message type.
public void ConfigureType<TEvent, TConsumer>(IServiceBusBusFactoryConfigurator busConfig, Container container, MessageHandlingOptions options) where TConsumer : class, IConsumer
{
string subName = NameHelper.GetSubscriptionName(@namespace, _serviceName);
var topicName = NameHelper.GetTopicName(@namespace, typeof(TEvent));
busConfig.SubscriptionEndpoint(subName, topicName, configurator =>
{
configurator.ConfigureConsumer(container, typeof(TConsumer));
if (!(options is null))
{
ConfigureRetry(configurator, options);
}
});
}
And to build the _errorrecovery queue. Each event also has an associated IConsumer specifically for handling events that fail.
var subName = NameHelper.GetSubscriptionName(@namespace, _serviceName);
busConfig.ReceiveEndpoint(subName + "_errorrecovery", config =>
{
config.ConfigureConsumer(_simpleContainer, faultConsumers.Select(i => i.GenericType).ToArray());
});
This produces a queue named subname_errorrecovery and a Topic named after the event. The service has a subscription in the Topic but so does the _errorrecovery. So every time a message is sent to the Topic, the consumers for the event and the consumer for the error both receive messages.
So I am looking for a way to have a service wired up to a recovery queue as well as multiple topics without that queue also subscribing to every topic as well.
It is also very possible I am handling this the wrong way. Perhaps I need to just add a check for duplicate messages and just move the message to the topic, allowing the services that originally processed the message successfully to simply ignore it. This is something I plan on doing anyhow but I am hoping for some guidance on error handling with MassTransit.
Any help would be appreciated.