7
votes

I am moving a calculation engine from an Azure worker role into Azure Service Fabric. It works by listening for messages on a Service Bus that it then processes based on the message content.

Currently, the calculation is working correctly but if the compute is taking longer than a minute or so the message does not get removed from the queue after completion. In the worker role, we solved this by increasing the "AutoRenewTimeout".

var options = new OnMessageOptions { AutoComplete = true, AutoRenewTimeout = TimeSpan.FromMinutes(3) };
        _queueClient.OnMessage(OnMessage, options);

However, using the "ServiceFabric.ServiceBus" nuget package, I cannot work out where you would set this. I have used the demo project as a reference to set up a stateless service that actually runs the compute. Below is an extract from CalculateService.cs where the Stateless Service is initialised.

internal sealed class CalculateService : StatelessService
{
    public CalculateService(StatelessServiceContext context)
        : base(context)
    { }

    /// <summary>
    /// Optional override to create listeners (e.g., TCP, HTTP) for this service replica to handle client or user requests.
    /// </summary>
    /// <returns>A collection of listeners.</returns>
    protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
    {
        string serviceBusQueueName = CloudConfigurationManager.GetSetting("QueueName");
        yield return new ServiceInstanceListener(context => new ServiceBusQueueCommunicationListener(new Handler(this), context, serviceBusQueueName), "StatelessService-ServiceBusQueueListener");
    }
}


internal sealed class Handler : AutoCompleteServiceBusMessageReceiver
{
    protected override Task ReceiveMessageImplAsync(BrokeredMessage message, CancellationToken cancellationToken)
    {
        ServiceEventSource.Current.ServiceMessage(_service, $"Handling queue message {message.MessageId}");
        var computeRole = new ExcelCompute();
        var rMessage = new RangeMessage();
        rMessage = message.GetBody<RangeMessage>();
        var result = computeRole.OnMessage(rMessage, message.MessageId); //returns true if the compute was successful (which it currently, always is)
        return Task.FromResult(result);
    }
 }

I did try using the <BrokeredMessage> message.Complete(); but that was throwing a message lock error.

1

1 Answers

6
votes
  1. Get the latest version of the package (>= v3.5.0). Set the property 'MessageLockRenewTimeSpan' on your CommunicationListener to a value that is smaller than your lock duration. (e.g. 50s where the lock duration is 60s) This allows for some clock skew. Note: by default this property is null, which means no automatic lock renewal is done.

This option works well when processing a batch that takes more time than the lock duration allows.

or

  1. You can use BrokeredMessage.RenewLock to periodically extend your message lock, when processing takes more time than the lock duration. You could do that inside a separate thread if needed. May be useful info

This option works well when processing a single message (batch size 1) takes more time than the lock duration allows.