2
votes

I've checked into the documentation regarding Scheduling with Azure Service Bus, but I am not clear on how exactly to send a message from a "disconnected" Bus.

Here's how I've configured my Service that is processing messages on the server:

builder.AddMassTransit(mt =>
{
    mt.AddConsumers(cqrsAssembly);

    mt.AddBus(context => Bus.Factory.CreateUsingAzureServiceBus(x =>
    {
        x.RequiresSession = true;
        x.MaxConcurrentCalls = 500;
        x.MessageWaitTimeout = TimeSpan.FromMinutes(5);
        x.UseRenewLock(TimeSpan.FromMinutes(4));
        x.UseServiceBusMessageScheduler();

        var host = x.Host(serviceUri, h =>
        {
            h.SharedAccessSignature(s =>
            {
                s.KeyName = "key-name";
                s.SharedAccessKey = "access-key";
                s.TokenTimeToLive = TimeSpan.FromDays(1);
                s.TokenScope = TokenScope.Namespace;
            });

            h.OperationTimeout = TimeSpan.FromMinutes(2);
        });

        x.ReceiveEndpoint(host, $"mt.myqueue", ep =>
        {
            ep.RequiresSession = true;
            ep.MaxConcurrentCalls = 500;
            ep.RemoveSubscriptions = true;

            ep.UseMessageRetry(r =>
            {
                r.Interval(4, TimeSpan.FromSeconds(30));
                r.Handle<TransientCommandException>();
            });

            ep.ConfigureConsumers(context);
        });
    });
});

I've explicitly called UseServiceBusMessageScheduler().

In the project that is creating and sending messages to the queue (runs in a different context, so is done to "send only"), we have this:

var bus = Bus.Factory.CreateUsingAzureServiceBus(x =>
{
    x.RequiresSession = true;
    x.MessageWaitTimeout = TimeSpan.FromMinutes(5);
    x.UseRenewLock(TimeSpan.FromMinutes(4));
    x.Send<ICommand>(s => s.UseSessionIdFormatter(ctx => ctx.Message.SessionId ?? Guid.NewGuid().ToString()));


    var host = x.Host(serviceUri, h =>
    {
        h.SharedAccessSignature(s =>
        {
            s.KeyName = "key-name";
            s.SharedAccessKey = "key"; 
            s.TokenTimeToLive = TimeSpan.FromDays(1);
            s.TokenScope = TokenScope.Namespace;
        });
        h.OperationTimeout = TimeSpan.FromMinutes(2);
    });

    EndpointConvention.Map<ICommand>(new Uri($"{serviceUri.ToString()}mt.myqueue"));
    EndpointConvention.Map<Command>(new Uri($"{serviceUri.ToString()}mt.myqueue"));
});

Now, to send a scheduled message, we do this:

var dest = "what?";
await bus.ScheduleSend(dest, scheduledEnqueueTimeUtc.Value, message);

I am unsure of what needs to be passed into the destinationAddress.

I've tried: - serviceUri - `{serviceUri}mt.myqueue"

But checking the queues, I don't see my message in either the base queue, the skipped queue or the error queue.

Am I missing some other configuration, and if not, how does one determine the destination queue?

Am using version 5.5.4 of Mass Transit, and every overload to ScheduleSend() requires it.

1

1 Answers

4
votes

First of all, yes your Uri format is correct. In the end after formatting you need something like this :

new Uri(@"sb://yourdomain.servicebus.windows.net/yourapp/your_message_queue")

Also make sure you added when you configured your endPoint. (See the link below)

configurator.UseServiceBusMessageScheduler();

If you follow the Mass-Transit documentation, scheduling is done from a ConsumeContext. See Mass-Transit Azure Scheduling

public class ScheduleNotificationConsumer :
    IConsumer<AssignSeat>
{
    Uri _schedulerAddress;
    Uri _notificationService;

    public async Task Consume(ConsumeContext<AssignSeat> context)
    {
        if(context.Message.ReservationTime - DateTime.Now < TimeSpan.FromHours(8))
        {
            // assign the seat for the reservation
        }
        else
        {
            // seats can only be assigned eight hours before the reservation
            context.ScheduleMessage(context.Message.ReservationTime - TimeSpan.FromHours(8), context.Message);
        }
    }
}

However in a use case we faced this week we needed to schedule from outside a consumeContext, or simply didn't want to forward the context down to where we scheduled. When using IBusControl.ScheduleSend we get no error feedback, but we also don't really get any scheduling done. After looking at what Mass-Transit does it turns out that from an IBusControl, it creates a new scheduling provider. Whereas from the Context it uses the ServiceBusScheduleMessageProvider.

So what we're doing now until we clean up this bit, is calling the ServiceBusScheduleMessageProvider outright.

        await new ServiceBusScheduleMessageProvider(_busControl).ScheduleSend(destinationUri
            , scheduleDateTime.UtcDateTime
            , Task.FromResult<T>(message)
            , Pipe.Empty<SendContext>()
            , default);

Hope it makes sense and helps a bit.