I am trying to create a MassTransit saga using AzureServiceBus and .NET Core.
I have an ASP.NET Core application that is successfully sending messages to the queue. It has this configuration in the Startup.cs:
services.AddSingleton(provider => Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
cfg.Host(
new Uri("https://zzz.servicebus.windows.net/"),
h =>
{
h.TransportType = TransportType.AmqpWebSockets;
h.OperationTimeout = TimeSpan.FromSeconds(5);
h.RetryLimit = 1;
h.TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider("RootManageSharedAccessKey", "xxx");
});
cfg.RequiresSession = true;
}));
EndpointConvention.Map<MassTransit.POC.Shared.IPurchasePolicyMessage>(
new Uri("https://zzz.servicebus.windows.net/masstransitqueue"));
services.AddSingleton<ISendEndpointProvider>(provider => provider.GetRequiredService<IBusControl>());
and this code in a controller sending the message:
var sendEndpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri("https://zzz.servicebus.windows.net/masstransitqueue"));
await
sendEndpoint.Send<IPurchasePolicyMessage>(
new
{
QuoteNumber = policyNumber,
CorrelationId = NewId.NextGuid().ToString("D")
}, context =>
{
context.SetSessionId(context.Message.CorrelationId.ToString());
}).ConfigureAwait(false);
I've got a separate .NET Core console application which is receiving the IPurchasePolicyMessage. It has this configuration in the Program.cs:
var bus = Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
var host = cfg.Host(
new Uri("https://zzz.servicebus.windows.net/"),
h =>
{
h.TransportType = TransportType.AmqpWebSockets;
h.OperationTimeout = TimeSpan.FromSeconds(5);
h.RetryLimit = 1;
h.TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider("RootManageSharedAccessKey", "xxx");
});
cfg.RequiresSession = true;
cfg.ReceiveEndpoint(host, "masstransitqueue", e =>
{
e.RequiresSession = true;
//e.Consumer<PurchasePolicyConsumer>();
e.Saga<PurchasePolicySaga>(new MessageSessionSagaRepository<PurchasePolicySaga>());
});
});
bus.Start();
and the PurchasePolicySaga is defined as:
public class PurchasePolicySaga :
ISaga,
InitiatedBy<IPurchasePolicyMessage>
{
public Guid CorrelationId { get; set; }
public async Task Consume(ConsumeContext<IPurchasePolicyMessage> context)
{
await Console.Out.WriteLineAsync($"Processing policy number {context.Message.QuoteNumber} in saga.");
}
}
the Consume method here is never called. There are no errors, there is no activity related to the incoming message in the logs, just nothing happens. Can someone please tell me how to find why?
I strongly suspect the issue relates to sessions, as when I change my saga to a simple consumer, remove the "RequiresSession" flags and delete the queue to allow MassTransit to recreate it, it works. However, as sessions are required for an AzureServiceBus-based MassTransit saga, I'm a bit stuck.
Further Investigation
Looking at the messages in the Azure dashboard, I see that the messages are being sent to the queue. This screenshot shows 3 messages:
However, there are no messages on the topic:
I should expect corresponding messages on the topic shouldn't I? Also, it doesn't seem right that the subscription here has Sessions Disabled?


bus.Startbeing called? - Alexey Zimarev