1
votes

I have an azure durable function that contains a number of activity functions. This function is triggered by a message on a service bus topic. I want to be able to manually complete/defer/dead-letter the service bus message based on the results of these activity functions. However, I am not finding a way to access the MessageReceiver from the ServiceBusTrigger in the trigger function.

I have tried passing the MessageReceiver as input into the Orchestration function, but this causes errors when retriving the value via context.GetInput<MessageReceiver>() because MessageReceiver cannot be serialized into JSON.

Function 'ImportTransactionOrchestrator (Orchestrator)' failed with an error. Reason: Newtonsoft.Json.JsonSerializationException: Unable to find a constructor to use for type Microsoft.Azure.ServiceBus.Core.MessageReceiver. A class should either have a default constructor, one constructor with arguments or a constructor marked with the JsonConstructor attribute.

Here is a summary of what I have:

Trigger Function

[FunctionName(nameof(ServiceBusTrigger))]
public async Task ServiceBusTrigger(
    [ServiceBusTrigger(topicName: "topic", subscriptionName: "sub",
        Connection = "connection")]
    Message serviceBusMessage,
    MessageReceiver messageReceiver,
    [DurableClient] IDurableOrchestrationClient starter,
    ILogger log)
{
    var instanceId = await starter
        .StartNewAsync("ImportTransactionOrchestrator", null, (messageReceiver, serviceBusMessage));
}

Orchestration Function

 [FunctionName(nameof(ImportTransactionOrchestrator))]
 public async Task RunOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context)
 {
     var (messageReceiver, serviceBusMessage ) = 
          context.GetInput<(MessageReceiver, Message)>(); // error occurs here
     {...}
 }

My searching didn't turn up much, so I suspect I am trying to do something odd, maybe. Any help would be appreciated!

2

2 Answers

1
votes

Your Orchestrator needs to be responsible to complete or not the messages. I believe messageReceiver can't be serialized and passed as a parameter to your child activities.

You need to change the autocomplete property to false in your host.json:

{
    "version": "2.0",
    "extensions": {
        "serviceBus": {
            "prefetchCount": 100,
            "messageHandlerOptions": {
                "autoComplete": false,
                "maxConcurrentCalls": 32,
                "maxAutoRenewDuration": "00:55:00"
            },
            "sessionHandlerOptions": {
                "autoComplete": false,
                "messageWaitTimeout": "00:00:30",
                "maxAutoRenewDuration": "00:55:00",
                "maxConcurrentSessions": 16
            }
        }
    }
}

and to complete the message:

await messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
0
votes

The main function should hold the responsibilities of the dead lettering.

A sample code look like this:

string instanceId = await starter.StartNewAsync...
var orchestrationStatus = await starter.GetStatusAsync(instanceId);
var status = orchestrationStatus.RuntimeStatus.ToString().ToUpper();

if (!(bool)orchestrationStatus.Output)
{
   await messageReceiver.AbandonAsync(lockToken);
}