1
votes

We are using the Microsoft Azure Service Bus to send command messages and we are using the OnMessage approach to get them from the bus. Inside of our implementation of IMessageSessionAsyncHandler.OnMessageAsync, it can be that we realize that a message is not ready for processing. So I want to get the message from the bus and readd them to the end of the queue . The get/readd operations must be atomic. How can I achieve this?

Thats my current solution (highly abstracted) but I am afraid about the non-atomicity.

queueClient.RegisterSessionHandlerFactory(new CommandSessionHandlerFactory(queueClient), ...);

internal class CommandSessionHandlerFactory : IMessageSessionAsyncHandlerFactory
{
    private readonly QueueClient _queueClient;
    public CommandSessionHandlerFactory(QueueClient queueClient)
    {
        _queueClient = queueClient;
    }

    public IMessageSessionAsyncHandler CreateInstance(MessageSession session, BrokeredMessage message)
    {
        return new CommandSessionHandlerAsync(_queueClient);
    }
}

internal class CommandSessionHandlerAsync : MessageSessionAsyncHandler
{
    private readonly QueueClient _queueClient;
    public CommandSessionHandlerAsync(QueueClient queueClient)
    {
        _queueClient = queueClient;
    }

    protected override async Task OnMessageAsync(MessageSession session, BrokeredMessage message)
    {
        if (!messageReadyForProcessing)
        {
            // How to get the following code transactional safe?
            var clonedMessage = message.Clone();
            await message.CompleteAsync();
            await _queueClient.SendAsync(clonedMessage);
        }
    }
}

And what about dublicate detection? Do we have to change the MessageId of the cloned message to be sure that the service bus duplicate detection does not drop the cloned message?

1

1 Answers

1
votes

And what about dublicate detection? Do we have to change the MessageId of the cloned message to be sure that the service bus duplicate detection does not drop the cloned message?

If duplicate detection is enabled. Your message will be removed from queue. Because you are using same MessageId and duplicate detection tracks messageId in giving period time (10 mins default).

      if (!messageReadyForProcessing)
        {
            // How to get the following code transactional safe?
            var clonedMessage = message.Clone();
            await message.CompleteAsync();
            await _queueClient.SendAsync(clonedMessage);
        }

you can change to this:

         if (!messageReadyForProcessing)
            {
                await message.CompleteAsync();
                message.MessageId = Guid.NewGuid().ToString();
                await _queueClient.SendAsync(message);
            }

But still there is a problem. What If message completes successfully, but it fails on sending message ? You will lose message. Retry policy can solve this. However it's dirty and not 100% guarantee.

You can increase your Max DeliveryCount. Beside of sending message again, just abandon(release) it to use again.

        if (!messageReadyForProcessing)
            {
                await message.AbandonAsyncy();

            }

This is better. You will be sure, it's transactional. Even if it exceeds max delivery count, it will go to dead queue.

But there is still a drawback. What will happen to real posined messages ? So if you have a message that never will be processed, it will posion your consumers. Because we have increased max delivery count.

In short: OnMessage is not good for your solution. Just get message from queue, when you are ready to process it. I think this is the most suitable solution for you.

   if (messageReadyForProcessing)
     {
        var mySession=QueueClient.AcceptMessageSession();
             var message = mySession.Receive();
     }

Edit:

You can use TransactionScope with service bus. You should work on same queue in scope.

Ps: transaction scope does not support abandon.

So you can apply this:

    if (!messageReadyForProcessing)
      {
         using (TransactionScope scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
            { 
                    await message.CompleteAsync();
                    message.MessageId = Guid.NewGuid().ToString();
                    await _queueClient.SendAsync(message);
          scope.Complete();                
      }
     }

Check Brokered Messaging: Transactions

I'm not sure this will work with asyncs. So you can test and check Asynchronous Transactions with Service Bus, if it not success.

Ps: They say there was a bug on net framework about transactionscope and asyncs. They suggest you to use 4.5.1 or higher. Check here fore more.