9
votes

I'm having an intermittent problem with the Azure Service bus. Sporadically, placing a message on the bus causes the following exception:

TYPE: InvalidOperationException

MESSAGE: The operation cannot be performed because the brokered message '723eab13dab34351a78bb687d0923b89' has already been consumed. Please use a new BrokeredMessage instance for the operation.

STACKTRACE

at Microsoft.ServiceBus.Messaging.MessagingUtilities.ValidateAndSetConsumedMessages(IEnumerable`1 messages)
at Microsoft.ServiceBus.Messaging.MessageSender.Send(TrackingContext trackingContext, IEnumerable`1 messages, TimeSpan timeout)
at Microsoft.Practices.TransientFaultHandling.RetryPolicy.<>c__DisplayClass1.<ExecuteAction>b__0()
at Microsoft.Practices.TransientFaultHandling.RetryPolicy.ExecuteAction[TResult](Func`1 func)
at IQ.IR.Core.ServiceBus.AzureBus`1.Enqueue(T message) in c:\BuildAgent\work\cc0c51104c02a4e9\IQ.IR.Core\ServiceBus\AzureBus.cs:line 69
...Rest of stacktrace snipped as it's within my app

The offending code from AzureBus is:

public void Enqueue(T message)
{
    using (var brokeredMessage = new BrokeredMessage(message) { Label = message.GetType().FullName, TimeToLive = _timeToLive })
    {
        _retryPolicy.ExecuteAction(() => _sender.Send(brokeredMessage));
    }
}

Where T message being passed in is

[Serializable]
public class ValidationMessage
{
    public string ValidationToken { get; set;}
}

And _retryPolicy is a

RetryPolicy<ServiceBusTransientErrorDetectionStrategy>

_timeToLive is a 12 hour timespan

Any ideas?

4

4 Answers

5
votes

The error indicates that the message was "already sent" but an error occurred in the process. Unfortunately there is no easy way to know this by inspecting the message and the message cannot be reused again as it is considered consumed. We are working on some improvements such as allowing you to query the state of such a message and throwing a MessagingException instead of InvalidOperation. Finally the ability to clone a message will help to make recovery easier from such failures.

6
votes

.... and to pile on to Abhishek's answer: Right now you need to construct a new BrokeredMessage for each retry, so your retry policy scope needs to be one level further up. Mind that if you put in a stream, we will use that stream as-is inside the brokered message and pull straight fro it onto the wire, so you will need to make copies of the stream ahead of time for a retry loop.

2
votes

I recently ran into this and found another cause: if you are constructing a BrokeredMessage and set a break point prior to sending it with your MessageSender, and inspect the properties on the Message, it will attempt to access the Queue and throw this exception. Simply sending the message first without inspecting its properties in the IDE will not cause this problem.

1
votes

Is it possible that the message could have been sent, even though there was an error? Should the message be idempotent before a retry is attempted in this case?