We are using the Microsoft Azure Service Bus to send command messages and we are using the OnMessage approach to get them from the bus. Inside of our implementation of IMessageSessionAsyncHandler.OnMessageAsync, it can be that we realize that a message is not ready for processing. So I want to get the message from the bus and readd them to the end of the queue . The get/readd operations must be atomic. How can I achieve this?
Thats my current solution (highly abstracted) but I am afraid about the non-atomicity.
queueClient.RegisterSessionHandlerFactory(new CommandSessionHandlerFactory(queueClient), ...);
internal class CommandSessionHandlerFactory : IMessageSessionAsyncHandlerFactory
{
private readonly QueueClient _queueClient;
public CommandSessionHandlerFactory(QueueClient queueClient)
{
_queueClient = queueClient;
}
public IMessageSessionAsyncHandler CreateInstance(MessageSession session, BrokeredMessage message)
{
return new CommandSessionHandlerAsync(_queueClient);
}
}
internal class CommandSessionHandlerAsync : MessageSessionAsyncHandler
{
private readonly QueueClient _queueClient;
public CommandSessionHandlerAsync(QueueClient queueClient)
{
_queueClient = queueClient;
}
protected override async Task OnMessageAsync(MessageSession session, BrokeredMessage message)
{
if (!messageReadyForProcessing)
{
// How to get the following code transactional safe?
var clonedMessage = message.Clone();
await message.CompleteAsync();
await _queueClient.SendAsync(clonedMessage);
}
}
}
And what about dublicate detection? Do we have to change the MessageId of the cloned message to be sure that the service bus duplicate detection does not drop the cloned message?