1
votes

I'm trying to notify the user when a message is not received after some time, using MassTransit and RabbitMQ.

From what I read, the timeout is set using the TimeToLive property when the message is published. When that specified time runs out, the message should be automatically added to a Dead Letter queue, named with a "_skipped" at the end.

How do I retrieve messages from Dead Letter queues? In my attempt below, the message is added to the both queues right away, and it never times out.

I think I could do this using sagas, but it seems like a over complicated solution for such a simple problem, so I would like to avoid using it if at all possible.

static void Main(string[] args)
{
    var bus = CreateBus("rabbitmq://localhost/", "guest", "guest", true);

    var msg = new TestMessage("First Message");
    LogMessageSent(msg);
    bus.Publish(msg, c => c.TimeToLive = TimeSpan.FromSeconds(15));

    Console.ReadKey();

    bus.Stop();

    bus = CreateBus("rabbitmq://localhost/", "guest", "guest", false);

    msg = new TestMessage("SecondMessage");
    LogMessageSent(msg);
    bus.Publish(msg, c => c.TimeToLive = TimeSpan.FromSeconds(15));

    Console.ReadKey();

    bus.Stop();
}

private static IBusControl CreateBus(string rabbitUrl, string username, string password, bool enableEndpoint)
{
    var bus = Bus.Factory.CreateUsingRabbitMq(c =>
    {
        var host = c.Host(new Uri(rabbitUrl), h =>
        {
            h.Username(username);
            h.Password(password);
        });

        if (enableEndpoint)
        {
            c.ReceiveEndpoint(host, "TestQueue", x =>
            {
                x.Handler<TestMessage>(e => LogMessageReceived(e.Message, "TestQueue"));
            });
        }

        c.ReceiveEndpoint(host, "TestQueue_skipped", x =>
        {
            x.Handler<TestMessage>(e => LogMessageReceived(e.Message, "TestQueue_skipped"));
        });
    });

    bus.Start();

    return bus;
}

private static void LogMessageSent(TestMessage msg)
{
    Console.WriteLine(string.Format("{0} - Message \"{1}\" sent.", DateTime.Now.ToString("HH:mm:ss"), msg.Content));
}

private static Task LogMessageReceived(TestMessage msg, string queueName)
{
    Console.WriteLine(string.Format("{0} - Message \"{1}\" received on queue \"{2}\".", DateTime.Now.ToString("HH:mm:ss"), msg.Content, queueName));
    return Task.CompletedTask;
}

public class TestMessage
{
    public string Content { get; }

    public TestMessage(string content)
    {
        Content = content;
    }
}
1

1 Answers

1
votes

Because you are calling Publish, the message is sent to every subscriber. Since each receive endpoint is adding the consumer, that creates a subscription (and subsequent exchange binding in RabbitMQ) for that message type. You can disable this by specifying BindMessageExchanges = false on that skipped receive endpoint. You will need to manually remove the exchange binding on the broker.

As to your TimeToLive question, that isn't how it works. TimeToLive is passed to the broker, and if the message expires, it is moved to a broker-specified dead-letter queue, if so configured. It is not moved to the skipped queue which has a different meaning in MassTransit. In MassTransit, the skipped queue is for messages that are delivered to a receive endpoint but there wasn't a consumer configured on that endpoint to consume the message.

For RabbitMQ, you can configure the dead-letter queue in MassTransit by using:

endpoint.BindDeadLetterQueue("dead-letter-queue-name");

This will configure the broker so that messages which reach their TTL are moved to the specified exchange/queue. Then your consumer on that receive endpoint will be able to consume them (again, be sure to set BindMessageExchanges = false on the dead-letter receive endpoint.

For example:

c.ReceiveEndpoint(host, "TestQueue_expired", x =>
{
    x.BindMessageExchanges = false;
    x.Handler<TestMessage>(e => LogMessageReceived(e.Message, "TestQueue_expired"));
});

And then your original receive endpoint:

c.ReceiveEndpoint(host, "TestQueue", x =>
{
    x.BindDeadLetterQueue("TestQueue_expired");
    x.Handler<TestMessage>(e => LogMessageReceived(e.Message, "TestQueue"));
});