0
votes

I have been using MassTransit with a Rabbit queue for a while, and have implemented an IMessageTracker similar to the one in this question: How to log failed message in masstransit? to track message retries and provide us with some monitoring when the retry occurs, and where in the retry sequence the message is - allowing us to get some stats on when a third party endpoint was causing us to retry. This meant that I had to store a retry sequence value against each message id and check for it/increment it on each retry and then remove the message id on call to MessageWasReceivedSuccessfully. Additionally, if the message failed after all retries, we could then monitor when the failure occurred (admittedly we didn't need the message tracker for this, but it became sensible when I'd created the tracker for the retries).

I would like to upgrade to MassTransit 3 as the new retry policies are something which would be beneficial in this situation, but I can't find a way to implement my message tracker with the MassTransit 3.

My current approach is to configure my handler with a consume observer and my bus with a receive observer as follows:

sbc.ReceiveEndpoint(host, config.QueueName, ep =>
{
    ep.Consumer(() => _container.Resolve<IEventConsumer>());
    ep.Observer(new EventConsumeObserver());
});        

_busControl.ConnectReceiveObserver(new ReceiveObserver(container.Resolve<IMonitoringPublisher>()));

And the observers as follows:

public class ReceiveObserver : IReceiveObserver
{
    public ReceiveObserver()
    {
    }

    public async Task PreReceive(ReceiveContext context)
    {
        await Console.Out.WriteLineAsync("ReceiveObserver - PreReceive Observed");
    }

    public async Task PostReceive(ReceiveContext context)
    {
        await Console.Out.WriteLineAsync("ReceiveObserver - PostReceive Observed");
    }

    public async Task PostConsume<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType) where T : class
    {
        await Console.Out.WriteLineAsync("ReceiveObserver - PostConsume Observed");
    }

    public async Task ConsumeFault<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType, Exception exception) where T : class
    {
        await Console.Out.WriteLineAsync($"ReceiveObserver - ConsumeFault Observed - {exception.Message}.");
    }

    public async Task ReceiveFault(ReceiveContext context, Exception exception)
    {
        await Console.Out.WriteLineAsync("ReceiveObserver - ReceiveFault Observed");
    }
}

public class EventConsumeObserver : IObserver<ConsumeContext<IEvent>>
{
    public void OnNext(ConsumeContext<IEvent> value)
    {
        Console.WriteLine("EventConsumeObserver - OnNext");
    }

    public void OnError(Exception error)
    {
        Console.WriteLine("EventConsumeObserver - OnError");
    }

    public void OnCompleted()
    {
        Console.WriteLine("EventConsumeObserver - OnCompleted");
    }
}

This does allow me to monitor when a retry has occurred (with a little fettling), but I can't find an equivalent to the MessageWasReceivedSuccessfully method on the message tracker. Additionally, when I run a test which simulates an exception being thrown by a timeout against this, the output doesn't seem to go near the on completed method - trace as follows:

(with timeout simulated)

ReceiveObserver - PreReceive Observed
Request timeout
EventConsumeObserver - OnNext
ReceiveObserver - PostConsume Observed
Request timeout
EventConsumeObserver - OnNext
ReceiveObserver - PostConsume Observed
Request timeout
EventConsumeObserver - OnNext
ReceiveObserver - PostConsume Observed
ReceiveObserver - ConsumeFault Observed - Call timed out

(without timeout simulated)

ReceiveObserver - PreReceive Observed
Request success
EventConsumeObserver - OnNext

I was hoping there would be more obvious calls to consume fault on each retry or that the OnError would be invoked when a retry was taking place and OnComplete after each event, but that doesn't seem to be the case.

Am I missing a type of observer or is there another way to hook into the retry so that I can monitor when they have occurred and how many times the retry was performed before success?

1
The ep.Observer() is creating a new message consumer, so I would remove that one as well. - Chris Patterson
I still get nothing even with the endpoint observer removed. I connected the observer to the endpoint after reading about the consume observers having to be connected to the endpoint - codegur.net/32913996/… - this is the same behaviour I am seeing, using MT 3.0.13 - Ian Cotterill
Oh, you might get to a recent release, I think observers were not working in a much older release. 3.1.2 is the latest. - Chris Patterson
Yeah, that seems to make the observers work! I was under the impression 3.0.13 was the latest, my mistake. Thanks for the help. - Ian Cotterill
There were some missed connections here and there that were not setup which have been fixed. Glad to hear it works now! - Chris Patterson

1 Answers

0
votes

You may want to look at the IConsumeObserver, which is more aligned with actual message consumption than it is message reception. While both carry interesting information, the consume observer should be called for each exception thrown by the consumer. The Pre/Post consume methods are also called for each message type consumed.