0
votes

I am running MassTransit 7.1.8 in Azure Functions v3 with Service Bus as transport. When the consumer throws an exception for the first time, two topics are created in my Service Bus namespace: masstransit~fault (without any subscriptions) and masstransit~fault--mynamespace~myevent-- (with one subscription Fault-MassTransit).

Azure Function Startup.cs

public override void Configure(IFunctionsHostBuilder builder)
{
    builder.Services.AddMassTransitForAzureFunctions(cfg =>
    {
        cfg.AddConsumersFromNamespaceContaining<MyConsumer>();
    });
}

Azure Function

[FunctionName("MyFunction")]
public async Task Run(
    [ServiceBusTrigger("my-topic", "my-subscription", Connection = "AzureWebJobsServiceBus")] Message message, ILogger log)
{
    try
    {
        await _receiver.HandleConsumer<MyConsumer>("my-topic", "my-subscription", message, default(CancellationToken));
    }
    catch (System.Exception ex)
    {
    }
}

Consumer

public class MyConsumer : IConsumer<MyEvent>
{
    public async Task Consume(ConsumeContext<MyEvent> context)
    {
        throw new System.Exception("Very bad things happened inside the consumer.");
    }
}

I also had to manually create a subscription on masstransit~fault to start receiving messages there.

My questions:

  1. MyFunction is swallowing the exception to prevent the message being sent to DLQ. Is this ok? Feels funky to swallow an exception, but on the other hand MassTransit sends the fault to the error queue so it's not lost.
  2. The documentation talks about having an _error queue (prefixed appropriately) created, but I don't see any such queue nor topic, only masstransit~fault and masstransit~fault--mynamespace~myevent--. I am guessing masstransit~fault--mynamespace~myevent-- is Service Bus equivalent of RabbitMQ _error?
  3. masstransit~fault--mynamespace~myevent-- has messages being sent to it, but when I look at the subscription it is empty. Subscription has a default filter that accepts everything. Do I have to do some additional configuration on the topic?
  4. Is there a way to rename above error topics (perhaps something changed in the meantime)?
1

1 Answers

1
votes

Azure Functions is not really using MassTransit as a transport. However, it is possible to configure certain aspects to mimic the same behavior. Setting DeliveryCount = 1 on the queue will prevent Azure from redelivering the message (which is causing the five retries, the default value is 5).

In response to the various questions:

  1. That's delivery count, and MassTransit publishing the faults.
  2. You can configure a retry policy (as shown below)
  3. There is no _error queue when using Azure Functions, since MassTransit isn't the transport.
  4. Unless you configure a consumer for the fault, it is not consumed.
  5. You can change the fault topic using an entity name formatter (also shown below).

To configure bus retry for an Azure Function:

.AddMassTransitForAzureFunctions(cfg =>
{
    cfg.SetKebabCaseEndpointNameFormatter();

    cfg.AddConsumersFromNamespaceContaining<ConsumerNamespace>();
}, (context, cfg) =>
{
    cfg.MessageTopology.SetEntityNameFormatter(new CleanEntityNameFormatter(cfg.MessageTopology.EntityNameFormatter));

    cfg.UseMessageRetry(r => r.Intervals(200, 500, 1000, 2000));
})

With Azure Functions, when retries are exhausted, Azure will move the message to the dead-letter queue (a logical queue that's part of the queue itself, visible in the Azure portal).

The entity name formatter specified will change the topic names for faults. It must be used on every bus instance that is going to produce/consume faults to ensure the names match.

using MassTransit;
using MassTransit.Internals.Extensions;
using MassTransit.Topology;

public class CleanEntityNameFormatter :
    IEntityNameFormatter
{
    readonly IEntityNameFormatter _entityNameFormatter;

    public CleanEntityNameFormatter(IEntityNameFormatter entityNameFormatter)
    {
        _entityNameFormatter = entityNameFormatter;
    }

    public string FormatEntityName<T>()
    {
        if (typeof(T).ClosesType(typeof(Fault<>), out Type[] types))
        {
            var name = (string) typeof(IEntityNameFormatter)
                .GetMethod("FormatEntityName")
                .MakeGenericMethod(types)
                .Invoke(_entityNameFormatter, Array.Empty<object>());

            var suffix = typeof(T).Name.Split('`').First();

            return $"{name}-{suffix}";
        }

        return _entityNameFormatter.FormatEntityName<T>();
    }
}