0
votes

I am attempting to use Azure Service Bus Topics to allow services to consume messages using MassTransit. A message is posted to a topic and each service that has a subscription to the topic receives a copy of the message. Each message type gets a topic.

The problem is how to deal with errors once the source of the exception is addressed. Once the message faults and ends up in the _error queue, I would like to move the message back to be processed after the message and/or service is fixed. I cannot move the message from the _error queue to the Topic because every service on that topic would get the message again.

I attempted to create a second queue using the ReceiveEndpoint method called _errorecovery but doing so caused the queue to subscribe to the topic, meaning the _errorrecovery queue gets every message published to that topic.

I am wondering if there is a way that I can set up a queue using MassTransit that will only process messages in that queue without adding the additional subscription.

Here is my current setup to build the topics.

TEvent is the message type and TConsumer is the associated IConsumer implementation for that message type.

  public void ConfigureType<TEvent, TConsumer>(IServiceBusBusFactoryConfigurator busConfig, Container container, MessageHandlingOptions options) where TConsumer : class, IConsumer
        {
            string subName = NameHelper.GetSubscriptionName(@namespace, _serviceName);
            var topicName = NameHelper.GetTopicName(@namespace, typeof(TEvent));

            busConfig.SubscriptionEndpoint(subName, topicName, configurator =>
            {
                configurator.ConfigureConsumer(container, typeof(TConsumer));
                if (!(options is null))
                {
                    ConfigureRetry(configurator, options);
                }
            });
        }

And to build the _errorrecovery queue. Each event also has an associated IConsumer specifically for handling events that fail.

 var subName = NameHelper.GetSubscriptionName(@namespace, _serviceName);

                        busConfig.ReceiveEndpoint(subName + "_errorrecovery", config =>
                        {
                            config.ConfigureConsumer(_simpleContainer, faultConsumers.Select(i => i.GenericType).ToArray());
                        });

This produces a queue named subname_errorrecovery and a Topic named after the event. The service has a subscription in the Topic but so does the _errorrecovery. So every time a message is sent to the Topic, the consumers for the event and the consumer for the error both receive messages.

So I am looking for a way to have a service wired up to a recovery queue as well as multiple topics without that queue also subscribing to every topic as well.

It is also very possible I am handling this the wrong way. Perhaps I need to just add a check for duplicate messages and just move the message to the topic, allowing the services that originally processed the message successfully to simply ignore it. This is something I plan on doing anyhow but I am hoping for some guidance on error handling with MassTransit.

Any help would be appreciated.

2
Do you need to use subscription endpoints, or can you just put your consumers on a receive endpoint and let MassTransit configure the subscription to forward to your receive endpoint? That's the default approach for pub/sub with MassTransit.Chris Patterson

2 Answers

0
votes

Azure Service Bus allows you to set up subscription filter rules for each subscription that you create for a topic (see https://docs.microsoft.com/en-us/azure/service-bus-messaging/topic-filters).

With that you can define filter rules for a subscription so that only messages that correspond that specific rules are put on the queue of that subscriber for processing.

These filter rules can be setup from code, via ARM templates, the Azure Portal or the Azure CLI.

What you can do is to make sure that a certain Message Property is added to the message if the processing failed before it is moved to the _error queue. I don't know MassTransit very well but I guess there will be an option to do so. Or maybe MassTransit itself already provides such kind of origin information so that you can tell from a set message property at what subscription source (queue) the message was processed before it was moved to the _error queue.

So let's consider when your failed message (maybe it failed because there was an exception because the consumer service's database could not be reached temporarily) is moved to the _error queue with some message property indicating where it has been processed. Let's call this property "ErrorOrigin" and you give it a unique value identifying a certain subscription.

So in your case you could define a filter rule for each subscription so that only messages that either have no property called "ErrorOrigin" or if the property is set it's value has to match an identifier name corresponding to that subscription (e.g. "subscriptionXforEventZ")

If you set up your subscription filter rules correctly you subscription consumers will now process all messages originally published to the topic as well as messages from the _error queue that have also been published to that topic. But with the filter rule Azure Service Bus will only put those failed messages on the corresponding subscription queue if the "ErrorOrigin" property of the message corresponds to that subscription.

I have been successfully using this feature in practice in a microservices architecture. It allows adding additional subscribers to a topic very easy but giving these subscribers the option to only receive messages from the topic which are of interest.

Here you can see an example of subscription filtering on Azure Service Bus to get a better idea of how it works: https://github.com/Azure/azure-service-bus/tree/master/samples/DotNet/Microsoft.ServiceBus.Messaging/TopicFilters

I hope this idea can help you solve your specific problem.

0
votes

The out of the box solution to this problem is to add the topic subscription rule. So, whenever you want to resubmit the messages, it will only go into those subscriptions based on the filter applied on the subscription. Learn about Topic SUbscription rule through the article here.