I have a workflow publishing an event from controller(belongs to csproj1) which'll be consumed by MySaga(belongs to .csproj2), then the Saga when receiving the published message it'll send a new Command that'll be consumed from CommandConsumer (belongs to csproj2).
Problem:
Can not receive command in CommandConsumer while sending it from MySaga.
I'm using Azure Service Bus and Consumer Saga
Here is my startup.cs which I think is not well configured:
services.AddMassTransit(x =>
{
x.AddConsumersFromNamespaceContaining<CommandConsumer>();
x.AddSaga<MySaga>().InMemoryRepository();
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host(Configuration["ServiceBus:ConnectionString"]);
cfg.ConfigureEndpoints(context);
});
});
Here is MySaga.cs
//...
public readonly Uri targetEndpoint = new Uri($"queue:{KebabCaseEndpointNameFormatter.Instance.SanitizeName(nameof(MyCommand))}");
public async Task Consume(ConsumeContext<AceptedEvent> context)
{
var sendEndpoint = await context.GetSendEndpoint(targetEndpoint);
await sendEndpoint.Send<MyCommand>(new
{
CorrelationId,
context.Message.Item1,
context.Message.Item2
});
}
//...
Here is CommandConsumer.cs
public class CommandConsumer: IConsumer<MyCommand>
{
public ILogger<MyCommand> _logger { get; }
public IService _myService { get; }
public MyCommandConsumer(ILogger<MyCommand> logger, Iservice myService)
{
_logger = logger;
_myService = myService;
}
public async Task Consume(ConsumeContext<MyCommand> context)
{
_logger?.LogInformation($"{context.Message.CorrelationId}");
try
{
await _myService.DoSomething();
await context.Publish<AcceptedEvent>(new
{
CorrelationId = context.CorrelationId.Value,
State = "NewState"
});
}
catch (Exception ex)
{
await context.Publish<RefusedEvent>(new
{
CorrelationId = context.CorrelationId.Value,
State = "Failed",
Error = ex.Message
});
throw;
}
}
}
If you have any recommendation ideas thanks for sharing and challenging me!
Solution:
As @ChrisPatterson mentioned, I used the Publish method since I have just one consumer for my command and it works as expected.
More than that I fixed my configuration for ReceiveEndpoint:
var repository = new InMemorySagaRepository<MySaga>();
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host(Configuration["ServiceBus:ConnectionString"]);
cfg.ReceiveEndpoint("queue", cfgEndpoint =>
{
cfgEndpoint.Saga<MySaga>(repository);
});
cfg.ReceiveEndpoint(KebabCaseEndpointNameFormatter.Instance.SanitizeName(nameof(MyCommand)),
cfgEndpoint =>
{
cfgEndpoint.ConfigureConsumer<CommandConsumer>(context);
});
});