0
votes

I have a use case with NServiceBus explained in this question. Essentially there is a message handler for a ImportProductCommand message. Typically a collection of these messages is sent to the handler after another handler receives an ImportProductsCommand which specifies the set of products to import. There is a requirement to know the status of a set import and so a saga is used to manage importing state. The handler for ImportProductCommand publishes a ProductImportedEvent message and ProductImportFailedEvent message upon completion. The saga subscribes to those messages correlating them to the initiating ImportProductsCommand message using an ID assigned to the message when it was first received.

public class ProductImportSaga : NServiceBus.Saga.Saga<ProductImportSagaData>,
    IAmStartedByMessages<ImportProductsCommand>,
    IHandleMessages<IProductImportedEvent>,
    IHandleMessages<IProductImportFailedEvent>
{
    public override void ConfigureHowToFindSaga()
    {
        ConfigureMapping<IProductImportedEvent>(x => x.ImportId, x => x.CorrelationId);
        ConfigureMapping<IProductImportFailedEvent>(x => x.ImportId, x => x.CorrelationId);
    }

    public void Handle(ImportProductsCommand message)
    {
        this.Data.ImportId = message.Id;
        this.Data.Total = message.SKUs.Length;
        foreach (var command in message.SKUs.Select(sku => new ImportProductCommand(message.SupplierId, sku, message.ImportImage, message.Id)))
            this.Bus.SendLocal(command);    // send to local handler shown below
    }

    public void Handle(IProductImportedEvent message)
    {
        this.Data.OnCompleted();
    }

    public void Handle(IProductImportFailedEvent message)
    {
        this.Data.OnFailed();
    }
}

And the handler of individual ImportProductCommand messages looks like this:

// handles messages sent by saga above (or sent individually by WCF service)
public class ImportProductHandler : IHandleMessages<ImportProductCommand>
{
    public IBus Bus { get; set; } 

    public void Handle(ImportProductCommand message)
    {
        // importing logic here and upon success:
        if (success)
        {
           this.Bus.Publish(new ProductImportedEvent(correlationId: message.Id));
        }
        else
        {
           this.Bus.Publish(new ProductImportFailedEvent(correlationId: message.Id));
        }
    }
}

This issue is that when the event messages are published they get placed on the queue associated with the process which is hosting both the individual handler and the saga. When this happens there may be a lot of messages in the queue that were originally sent by the saga in response to the ImportProductsMessage. This means that the saga won't receive those events until after the queued ImportProductCommand messages have been handled and therefore processing status of the batch import won't be updated in a timely manner. If I host the saga in a different process then it will receive the messages without having to wait for the command queue to process. Is there a way to achieve the same effect while hosting the handler and the saga in the same process? Basically, I would like for the event messages to be processed in an order different from the ImportProductCommand messages even though they are on the same queue so that the saga can handle those events and update its state accordingly. Is this possible or is there a better way to achieve this result? I've tried to specify message handler ordering using First<T> to no avail and it seems overkill to deploy two different hosts for closely related logic.

1

1 Answers

2
votes

There is no concept of priority with NSB so it is typical to use another endpoint to perform the work. It sounds like what you are after is work distribution, you may want to take a look at the Distributor. In that model the Saga would maintain the state of the entire unit of work while each endpoint(s) would handle the actual processing. This would allow you to add additional endpoints dynamically if things were to start to slow down.

If you didn't want to implement the full Distributor, then at the very least pushing the actual work to another endpoint would alleviate any need for priority.