1
votes

I'm sending messages to a Service Bus Queue from a Worker Role. I noticed that randomly some messages are lost.

When I debug, I set a breakpoint after the Send method and log into my Azure Panel to check if the message queue increased. What I found out is that strangely, messages are not added to the queue some times. But it is not randomly. The pattern is: One message is added correctly and the next is lost, then again next is ok, and the next is lost.

I've implemented a retry pattern like the following but apparently not all the messages are sent correctly.

My code is:

var baseAddress = RoleEnvironment.GetConfigurationSettingValue("namespaceAddress");
var issuerName = RoleEnvironment.GetConfigurationSettingValue("issuerName");
var issuerKey = RoleEnvironment.GetConfigurationSettingValue("issuerKey");
var retryStrategy = new FixedInterval(5, TimeSpan.FromSeconds(2));
var retryPolicy = new RetryPolicy<ServiceBusTransientErrorDetectionStrategy>(retryStrategy);
Uri namespaceAddress = ServiceBusEnvironment.CreateServiceUri("sb", baseAddress, string.Empty);

this.namespaceManager = new NamespaceManager(namespaceAddress, TokenProvider.CreateSharedSecretTokenProvider(issuerName, issuerKey));
this.messagingFactory = MessagingFactory.Create(namespaceAddress, TokenProvider.CreateSharedSecretTokenProvider(issuerName, issuerKey));
//namespaceManager.GetQueue("chatmessage");
QueueClient client = messagingFactory.CreateQueueClient("chatmessage");
APPService.Model.MessageReceived messageReceived = new Model.MessageReceived();
messageReceived.From= e.From;
messageReceived.Op = e.Operator;
messageReceived.Message = e.Body;
BrokeredMessage msg = null;

// Use a retry policy to execute the Send action in an asynchronous and reliable fashion.
retryPolicy.ExecuteAction
(
    (cb) =>
    {
        // A new BrokeredMessage instance must be created each time we send it. Reusing the original BrokeredMessage instance may not 
        // work as the state of its BodyStream cannot be guaranteed to be readable from the beginning.
        msg = new BrokeredMessage(messageReceived);

        // Send the event asynchronously.
        client.BeginSend(msg, cb, null);
    },
    (ar) =>
    {
        try
        {
            // Complete the asynchronous operation. This may throw an exception that will be handled internally by the retry policy.
            client.EndSend(ar);
        }
        finally
        {
            // Ensure that any resources allocated by a BrokeredMessage instance are released.
            if (msg != null)
            {
                msg.Dispose();
                msg = null;
            }
        }
    },
    ()=>{},
    (ex) =>
    {
        // Always dispose the BrokeredMessage instance even if the send operation has completed unsuccessfully.
        if (msg != null)
        {
            msg.Dispose();
            msg = null;
        }

        // Always log exceptions.
        Trace.TraceError(ex.Message);
    }
);

What could be wrong?

1
I've been finding that messages don't send if I Dispose them after await qc.SendAsyncmcintyre321

1 Answers

2
votes

Just looking at your code I am unable to notice anything that cloud lead up to the issue. I suppose the counter on the portal is not updated in real time so I would not consider it as a test result.

I know I am not helping right now, but If I had the doubt you are expressing, I would build a end-to-end test to send and receive the messages, comparing the messageId.

Do you already use Service Bus Explorer by Paolo Salvatori on MSDN? You can use it to test the entities. It is a reliable tool to explore and test Service Bus.