0
votes

I am using Peek() method to view the message from Azure Service Bus Queue.

I then "Defer" the message based on my application level condition. Below is the snippet of my operation:

msgClient = clientMessagingFactory.CreateQueueClient(QueueClient.FormatDeadLetterPath(sourceQueue.Name), ReceiveMode.PeekLock);
message = msgClient.Peek(); // Got a message
if (message.MessageId == "xxx")
{
   if(message.State == MessageState.Active)
   {
      BrokeredMessage _message = null;
      _message = msgClient.Receive(); // Getting a different message though there are no more than 1 listener :(
      if (_message != null){
         _message.Defer();
         _message.Abandon();
      }
   }
   else
   {
     // Perform some operation with deferred state message
   }
}

Here, msgClient.Peek() and msgClient.Receive() returns different messages. So I am pushed into a situation of deferring a wrong message. How do i solve it?

1
I guess what you want is this: "In PeekLock mode, the receive operation becomes two-stage,[...]. When Service Bus receives the request, it finds the next message to be consumed, locks it to prevent other consumers from receiving it, and then returns it to the application. After the application finishes processing the message, it completes the second stage of the receive process by calling CompleteAsync on the received message..." ? - Receive modesFildor
You're right! Getting the message in Receive operation does the trick. But can you tell me why does it return different messages in Peek and receive methods?Deepak
That would be speculation but do you happen to have more than 1 consumer?Fildor
Yes there are more than 1 active listeners in the queue. So locking it and then processing won't be a good idea. That's why i wished to peek first and then receive and defer if it matches the condition. Is there a way to achieve this?Deepak
If you just peek, that means it is free to be consumed from the queue by other clients. Like this: Client1: Peek on Message1, Client2: Peek on Message1, Client1: Recv Message1, Client2: Recv (now Message1 is not there any more) Message2 ... so you either lock the message (which is not locking the queue) or receive it from the start. As is, you have a racing condition.Fildor

1 Answers

1
votes

Most probable reason is due to multiple concurrent receivers where between peek and receive, another consumer calls receive() rendering current receive to pickup the next message instead. Your code worked as expected for me when using a single listener with a non-partitioned queue. Another reason can be the queue is 'partitioned' where peek and receive picked message from different fragments.

Modifying your code a bit like below should solve your problem in concurrent competing consumers scenario. Find inline comments to explain.

var msgClient = clientMessagingFactory.CreateQueueClient(QueueClient.FormatDeadLetterPath(sourceQueue.Name), ReceiveMode.PeekLock);
var message = msgClient.Receive();
if (message != null)
{
   // we got here means, there was one active message found and now message is locked from others view
   if (message.MessageId == "xxx")
   {
      //with id 'xxx', defer
      message.Defer();
   }
   // regardless of condition match, abandoning   
   message.Abandon();
}
else
{
   // got here means there was no active Message above
   // now see if we have deferred Message
   var peeked = msgClient.Peek();
   // The reason of checking state here again, in case there was new message arrived in the queue by now
   if (peeked != null && peeked.MessageId == "xxx" && peeked.State == MessageState.Deferred)
   {
     try
     {
       // now receive it to lock 
       // yes, you can receive 'deferred' message by sequence number, but not 'active' message :)
       message = msgClient.Receive(peeked.SequenceNumber);

       // Perform some operation with deferred state message

       message.Complete(); // don't forget it
     }
     catch (MessageNotFoundException)
     {
       // just in case message was peeked by competing consumer and received
     }
   }
}

Note: The peek operation on a non-partitioned entity always returns the oldest message, but not on a partitioned entity. Instead, it returns the oldest message in one of the partitions whose message broker responded first. There is no guarantee that the returned message is the oldest one across all partitions.