0
votes

We use NServiceBus 6 with transports MSMQ and RabbitMq. When we had MSMQ with distributed transactions, we never had message dispatched earlier than distributed transaction get committed. However when we turned it off and wrap it inside transaction scope manually we see that Nsb started sending messages before the handler execution is over. Our code looks like this

public Task Handle(ICommand1 message, IMessageHandlerContext context)
{
   using (var tx = _session.BeginTransaction(IsolationLevel.ReadCommitted))
   {    
       HandleMessage1(message1);

       _session.Flush();
       tx.Commit();
    }   
}

private void HandleMessage1(ICommand1 message1)
{
    // Updating database
    ...
    // Sending other Command2 to separate handler
    bus.Send<ICommand2>(x =>
    {
       ...
    });
}

From logs I can see that ICommand2 starts handling earlier than ICommand1 handler managed to commit data changes in database, getting 'old' data, before Command1 handler get them updated.

I was under impression that we won't have this sort of problem since Nsb6 provides us with Batched message dispatch. Looks like it's not a case for us, I wonder why and how we can fix that. I tried to run it under MSMQ (no distrib. transaction) and RabbitMq, result is the same.

Command2 handler works on changes made by Command1 handler, so how can we make them work sequentially?

1

1 Answers

2
votes

Sending messages from the handler should be done using the context passed into handler. When you use bus.Send() you're likely using IMessageSession rather than IMessageHandlerContext, which is equivalent to immediate dispatch.

Changing your code to the following should resolve it:

public async Task Handle(ICommand1 message, IMessageHandlerContext context)
{
   using (var tx = _session.BeginTransaction(IsolationLevel.ReadCommitted))
   {    
       await HandleMessage1(message1, context);

       _session.Flush();
       tx.Commit();
    }   
}

private async Task  HandleMessage1(ICommand1 message1, IMessageHandlerContext context)
{
    // Updating database
    ...
    // Sending other Command2 to separate handler
    await context.Send<ICommand2>(x =>
    {
       ...
    });
}