0
votes

I have a workflow publishing an event from controller(belongs to csproj1) which'll be consumed by MySaga(belongs to .csproj2), then the Saga when receiving the published message it'll send a new Command that'll be consumed from CommandConsumer (belongs to csproj2).

Problem:

Can not receive command in CommandConsumer while sending it from MySaga.

I'm using Azure Service Bus and Consumer Saga

Here is my startup.cs which I think is not well configured:


        services.AddMassTransit(x =>
        {                
            x.AddConsumersFromNamespaceContaining<CommandConsumer>();            
            x.AddSaga<MySaga>().InMemoryRepository();

            x.UsingAzureServiceBus((context, cfg) =>
            {
                cfg.Host(Configuration["ServiceBus:ConnectionString"]);
                cfg.ConfigureEndpoints(context);
            });
        });

Here is MySaga.cs

//...

    public readonly Uri targetEndpoint = new Uri($"queue:{KebabCaseEndpointNameFormatter.Instance.SanitizeName(nameof(MyCommand))}");

    public async Task Consume(ConsumeContext<AceptedEvent> context)
    {
        var sendEndpoint = await context.GetSendEndpoint(targetEndpoint);
        await sendEndpoint.Send<MyCommand>(new
        {
            CorrelationId,
            context.Message.Item1,
            context.Message.Item2
        });
    }
//...

Here is CommandConsumer.cs


public class CommandConsumer: IConsumer<MyCommand>
{
    public ILogger<MyCommand> _logger { get; }
    public IService _myService { get; }
    public MyCommandConsumer(ILogger<MyCommand> logger, Iservice myService)
    {
        _logger = logger;
        _myService = myService;
    }
    

    public async Task Consume(ConsumeContext<MyCommand> context)
    {
        _logger?.LogInformation($"{context.Message.CorrelationId}");            
        try
        {
            await _myService.DoSomething();
            await context.Publish<AcceptedEvent>(new
            {
                CorrelationId = context.CorrelationId.Value,
                State = "NewState"
            });
        }
        catch (Exception ex)
        {
            await context.Publish<RefusedEvent>(new
            {
                CorrelationId = context.CorrelationId.Value,
                State = "Failed",
                Error = ex.Message
            });
            throw;
        }
     }
}

If you have any recommendation ideas thanks for sharing and challenging me!

Solution:

As @ChrisPatterson mentioned, I used the Publish method since I have just one consumer for my command and it works as expected.

More than that I fixed my configuration for ReceiveEndpoint:


var repository = new InMemorySagaRepository<MySaga>();
        x.UsingAzureServiceBus((context, cfg) =>
        {
            cfg.Host(Configuration["ServiceBus:ConnectionString"]);
            cfg.ReceiveEndpoint("queue", cfgEndpoint =>
            {
                cfgEndpoint.Saga<MySaga>(repository);
            });
            cfg.ReceiveEndpoint(KebabCaseEndpointNameFormatter.Instance.SanitizeName(nameof(MyCommand)),
                cfgEndpoint =>
                {
                    cfgEndpoint.ConfigureConsumer<CommandConsumer>(context);
                });

        });

1

1 Answers

0
votes

The producer is sending to this receive endpoint:

public readonly Uri targetEndpoint = 
new Uri($"queue:{KebabCaseEndpointNameFormatter.Instance.SanitizeName(nameof(MyCommand))}");

Which is going to end up being "my-command" based upon how you're using it.

Whereas ConfigureEndpoints is going to use the consumer class name for the endpoint, which would be the equivalent of:

KebabCaseEndpointNameFormatter.Instance.Consumer<CommandConsumer>()

Which ends up being "command" – so the send endpoint is not sending to the correct address for the consumer.

If that is the only consumer for that command, you could just use Publish instead, so that you don't need to know the address of the consumer. Otherwise, you'll need to know the correct destination address.