0
votes

According to the documentation at https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus-trigger?tabs=csharp#usage (emphasis mine) a MessageReceiver is one of the allowed parameter types:

The following parameter types are available for the queue or topic message:

  • string - If the message is text.
  • byte[] - Useful for binary data.
  • A custom type - If the message contains JSON, Azure Functions tries to deserialize the JSON data.
  • BrokeredMessage - Gives you the deserialized message with the BrokeredMessage.GetBody() method.
  • MessageReceiver - Used to receive and acknowledge messages from the message container (required when autoComplete is set to false)

These parameter types are for Azure Functions version 1.x; for 2.x and higher, use Message instead of BrokeredMessage.

My function works fine with type Microsoft.Azure.ServiceBus.Message as the first parameter and I'm able to see the expected SessionId within the myQueueItem object:

using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;

namespace DataUpdateNotification.AzureFunctions
{
    public static class PersonIdFunction
    {
        [FunctionName("PersonIdFunction")]
        public static void Run([ServiceBusTrigger("personid", Connection = "AzureWebJobsServiceBus", IsSessionsEnabled = true)]Message myQueueItem, ILogger log)
        {
            log.LogInformation($"C# ServiceBus queue trigger function processed message: {myQueueItem}");
        }
    }
}

but when I try to use Microsoft.Azure.ServiceBus.Core.MessageReceiver as the first parameter it throws Exception binding parameter 'myQueueItem'. System.Private.DataContractSerialization: Type 'Microsoft.Azure.ServiceBus.Core.MessageReceiver' cannot be serialized.:

using Microsoft.Azure.ServiceBus.Core;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;

namespace DataUpdateNotification.AzureFunctions
{
    public static class PersonIdFunction
    {
        [FunctionName("PersonIdFunction")]
        public static void Run([ServiceBusTrigger("personid", Connection = "AzureWebJobsServiceBus", IsSessionsEnabled = true)]MessageReceiver myQueueItem, ILogger log)
        {
            log.LogInformation($"C# ServiceBus queue trigger function processed message: {myQueueItem}");
        }
    }
}

Background

I have a system that periodically sends messages to a session enabled Azure Service Bus queue (not topic nor subscription). The sender always sets the SessionId to allow me to group similar messages together. I would like to use the MessageReceiver object (which is promised by the documentation) to immediately call messageReceiver.CloseAsync() to remove/complete all messages related to that session. The overall goal is to basically do a SELECT DISTINCT so that I can process/remove/complete all the related messages in a single call.

Environment

AzureFunctions.csproj

<Project Sdk="Microsoft.NET.Sdk">
  <PropertyGroup>
    <TargetFramework>netcoreapp3.1</TargetFramework>
    <AzureFunctionsVersion>v3</AzureFunctionsVersion>
  </PropertyGroup>
  <ItemGroup>
    <PackageReference Include="Microsoft.Azure.WebJobs.Extensions.ServiceBus" Version="4.2.0" />
    <PackageReference Include="Microsoft.NET.Sdk.Functions" Version="3.0.11" />
  </ItemGroup>
  <ItemGroup>
    <None Update="host.json">
      <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
    </None>
    <None Update="local.settings.json">
      <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
      <CopyToPublishDirectory>Never</CopyToPublishDirectory>
    </None>
  </ItemGroup>
</Project>

host.json

{
    "version": "2.0",
    "logging": {
        "applicationInsights": {
            "samplingExcludedTypes": "Request",
            "samplingSettings": {
                "isEnabled": true
            }
        }
    }
}

local.settings.json

{
    "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "FUNCTIONS_WORKER_RUNTIME": "dotnet",
    "AzureWebJobsServiceBus": "Endpoint=sb://redacted.servicebus.windows.net/;SharedAccessKeyName=RedactedAzureFunctionKey;SharedAccessKey=UkVEQUNURUQgQkVOIFdVWiBIRVJFIFJFREFDVEVE"
  }
}

Complete error message:

Azure Functions Core Tools
Core Tools Version:       3.0.2996 Commit hash: c54cdc36323e9543ba11fb61dd107616e9022bba
Function Runtime Version: 3.0.14916.0


Functions:

        PersonIdFunction: serviceBusTrigger

For detailed output, run func with --verbose flag.
[2020-12-08T14:00:26.451Z] Executing 'PersonIdFunction' (Reason='(null)', Id=51217a44-b2b0-4629-91d5-3035ece95155)
[2020-12-08T14:00:26.451Z] Executing 'PersonIdFunction' (Reason='(null)', Id=763a7222-277f-4fd3-8fcf-36042523b924)
[2020-12-08T14:00:26.454Z] Trigger Details: MessageId: d6d4b0895632465183f1c6aa8b84cb6f, SequenceNumber: 18, DeliveryCount: 1, EnqueuedTimeUtc: 2020-12-08T14:00:11.7240000Z, LockedUntilUtc: 9999-12-31T23:59:59.9999999Z, SessionId: 753
[2020-12-08T14:00:26.455Z] Trigger Details: MessageId: b21b8df0452e4df0bac8f67a058a5931, SequenceNumber: 17, DeliveryCount: 1, EnqueuedTimeUtc: 2020-12-08T14:00:11.7240000Z, LockedUntilUtc: 9999-12-31T23:59:59.9999999Z, SessionId: 159
[2020-12-08T14:00:27.060Z] Executed 'PersonIdFunction' (Failed, Id=51217a44-b2b0-4629-91d5-3035ece95155, Duration=705ms)[2020-12-08T14:00:27.060Z] Executed 'PersonIdFunction' (Failed, Id=763a7222-277f-4fd3-8fcf-36042523b924, Duration=705ms)[2020-12-08T14:00:27.062Z] System.Private.CoreLib: Exception while executing function: PersonIdFunction. Microsoft.Azure.WebJobs.Host: Exception binding parameter 'myQueueItem'. System.Private.DataContractSerialization: Type 'Microsoft.Azure.ServiceBus.Core.MessageReceiver' cannot be serialized. Consider marking it with the DataContractAttribute attribute, and marking all of its members you want serialized with the DataMemberAttribute attribute. Alternatively, you can ensure that the type is public and has a parameterless constructor - all public members of the type will then be serialized, and no attributes will be required.
[2020-12-08T14:00:27.064Z] System.Private.CoreLib: Exception while executing function: PersonIdFunction. Microsoft.Azure.WebJobs.Host: Exception binding parameter 'myQueueItem'. System.Private.DataContractSerialization: Type 'Microsoft.Azure.ServiceBus.Core.MessageReceiver' cannot be serialized. Consider marking it with the DataContractAttribute attribute, and marking all of its members you want serialized with the DataMemberAttribute attribute. Alternatively, you can ensure that the type is public and has a parameterless constructor - all public members of the type will then be serialized, and no attributes will be required.
[2020-12-08T14:00:27.102Z] Message processing error (Action=UserCallback, ClientId=QueueClient1personid, EntityPath=personid, Endpoint=redacted.servicebus.windows.net)
[2020-12-08T14:00:27.102Z] Message processing error (Action=UserCallback, ClientId=QueueClient1personid, EntityPath=personid, Endpoint=redacted.servicebus.windows.net)
[2020-12-08T14:00:27.105Z] System.Private.CoreLib: Exception while executing function: PersonIdFunction. Microsoft.Azure.WebJobs.Host: Exception binding parameter 'myQueueItem'. System.Private.DataContractSerialization: Type 'Microsoft.Azure.ServiceBus.Core.MessageReceiver' cannot be serialized. Consider marking it with the DataContractAttribute attribute, and marking all of its members you want serialized with the DataMemberAttribute attribute. Alternatively, you can ensure that the type is public and has a parameterless constructor - all public members of the type will then be serialized, and no attributes will be required.
[2020-12-08T14:00:27.111Z] System.Private.CoreLib: Exception while executing function: PersonIdFunction. Microsoft.Azure.WebJobs.Host: Exception binding parameter 'myQueueItem'. System.Private.DataContractSerialization: Type 'Microsoft.Azure.ServiceBus.Core.MessageReceiver' cannot be serialized. Consider marking it with the DataContractAttribute attribute, and marking all of its members you want serialized with the DataMemberAttribute attribute. Alternatively, you can ensure that the type is public and has a parameterless constructor - all public members of the type will then be serialized, and no attributes will be required.
1
Hi, any update?:)1_1
@BowmanZhu No update yet. I haven't had a chance to try it. I plan to try it in the next few days. Looks like the documentation could use some clarification as a MessageReceiver must be used in addition to one of the other types.Benrobot

1 Answers

0
votes

but when I try to use Microsoft.Azure.ServiceBus.Core.MessageReceiver as the first parameter it throws Exception binding parameter 'myQueueItem'. System.Private.DataContractSerialization: Type 'Microsoft.Azure.ServiceBus.Core.MessageReceiver' cannot be serialized.:

You can use it as the parameter, it is no problem. But you can not use it after the Trigger attribute. And, a very important thing, The method messageReceiver.CloseAsync() can not close session(also can not close the queue client. Just don't work. Function seems didn't do logic about this.) when session is enabled. You don't even have to create the closeAsync() method to close the session. Close session operation is managed by messageWaitTimeout. Function will wait until it is up to time.

I have a system that periodically sends messages to a session enabled Azure Service Bus queue (not topic nor subscription). The sender always sets the SessionId to allow me to group similar messages together. I would like to use the MessageReceiver object (which is promised by the documentation) to immediately call messageReceiver.CloseAsync() to remove/complete all messages related to that session. The overall goal is to basically do a SELECT DISTINCT so that I can process/remove/complete all the related messages in a single call.

Basically, you can follow below code of service bus trigger to achieve what you want:

.cs file

using System;
using System.Text;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Core;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;

namespace FunctionApp45
{
    public static class Function1
    {
        [FunctionName("Function1")]
        public static async System.Threading.Tasks.Task RunAsync([ServiceBusTrigger("myqueue", Connection = "str",IsSessionsEnabled =true)] Message myQueueItem,    MessageReceiver messageReceiver, ILogger log)
        {
            log.LogInformation($"C# ServiceBus queue trigger function processed message: {Encoding.ASCII.GetString(myQueueItem.Body)}");

            //Some logic here.

            await messageReceiver.CompleteAsync(myQueueItem.SystemProperties.LockToken);
        }
    }
}

host.json

{
    "version": "2.0",
    "extensions": {
        "serviceBus": {
            "prefetchCount": 100,
            "messageHandlerOptions": {
                "autoComplete": false,
                "maxConcurrentCalls": 32,
                "maxAutoRenewDuration": "00:05:00"
            },
            "sessionHandlerOptions": {
                "autoComplete": false,
                "messageWaitTimeout": "00:00:30",
                "maxAutoRenewDuration": "00:55:00",
                "maxConcurrentSessions": 1
            }
        }
    }
}

With the above code and host.json file, the function will be triggered by one message and process the relative message with the session.(After newest message been processed, function will wait 30 seconds, if still no same session id message comes in, then listener of specific session will close.)

I use below console app to test:

using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Core;
using Newtonsoft.Json;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp17
{
    public class Program 
    {
        string connectionString = "Endpoint=sb://bowman1012.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=X/NHgQ4AQxul6YlMsUofD+JNE20Tovnzx3g2gDt8qyY=";
        string SessionQueueName = "myqueue";
        QueueClient queueClient;
        public async Task Run()
        {
            Console.WriteLine("Press any key to exit the scenario");

            await Task.WhenAll(
                this.SendMessagesAsync(Guid.NewGuid().ToString(), connectionString, SessionQueueName),
                this.SendMessagesAsync(Guid.NewGuid().ToString(), connectionString, SessionQueueName),
                this.SendMessagesAsync(Guid.NewGuid().ToString(), connectionString, SessionQueueName),
                this.SendMessagesAsync(Guid.NewGuid().ToString(), connectionString, SessionQueueName));

            queueClient = new QueueClient(connectionString, SessionQueueName);
            RegisterOnMessageHandlerAndReceiveMessages();

            await queueClient.CloseAsync();
        }

        async Task SendMessagesAsync(string sessionId, string connectionString, string queueName)
        {
            var sender = new MessageSender(connectionString, queueName);
          var client = new QueueClient(connectionString, SessionQueueName);
            dynamic data = new[]
            {
                new {step = 1, title = "Shop"},
                new {step = 2, title = "Unpack"},
                new {step = 3, title = "Prepare"},
                new {step = 4, title = "Cook"},
                new {step = 5, title = "Eat"},
            };

            for (int i = 0; i < data.Length; i++)
            {
                var message = new Message(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data[i])))
                {
                    SessionId = sessionId,
                    ContentType = "application/json",
                    Label = "RecipeStep",
                    MessageId = i.ToString(),
                    TimeToLive = TimeSpan.FromMinutes(2)
                };
                await client.SendAsync(message);
                lock (Console.Out)
                {
                    Console.ForegroundColor = ConsoleColor.Yellow;
                    Console.WriteLine("Message sent: Session {0}, MessageId = {1}", message.SessionId, message.MessageId);
                    Console.ResetColor();
                }
            }
        }

         void RegisterOnMessageHandlerAndReceiveMessages()
        {
            
            queueClient.RegisterSessionHandler( processMessage,
               
                new SessionHandlerOptions(LogMessageHandlerException)
                {
                    
                    MaxConcurrentSessions = 16,
                    AutoComplete = false,
                    MessageWaitTimeout=TimeSpan.FromSeconds(30),
                    MaxAutoRenewDuration=TimeSpan.FromMinutes(55),
                });
        
        }

        private async Task<Task> processMessage(IMessageSession session, Message message, CancellationToken cancellationToken) {
            Console.WriteLine("His");
           
                var body = message.Body;

                dynamic recipeStep = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(body));
                lock (Console.Out)
                {
                    Console.ForegroundColor = ConsoleColor.Cyan;
                    Console.WriteLine(
                        "\t\t\t\tMessage received:  \n\t\t\t\t\t\tSessionId = {0}, \n\t\t\t\t\t\tMessageId = {1}, \n\t\t\t\t\t\tSequenceNumber = {2}," +
                        "\n\t\t\t\t\t\tContent: [ step = {3}, title = {4} ]",
                        message.SessionId,
                        message.MessageId,
                        message.SystemProperties.SequenceNumber,
                        recipeStep.step,
                        recipeStep.title);
                    Console.ResetColor();
                }
                await session.CompleteAsync(message.SystemProperties.LockToken);

                if (recipeStep.step == 5)
                {
                    // end of the session!
                    await session.CloseAsync();
                }

            return Task.CompletedTask;

        }

        private Task LogMessageHandlerException(ExceptionReceivedEventArgs e)
        {
            Console.WriteLine("Exception: \"{0}\" {1}", e.Exception.Message, e.ExceptionReceivedContext.EntityPath);
            return Task.CompletedTask;
        }

        public static int Main(string[] args)
        {
            try
            {
                var app = new Program();
                app.Run().GetAwaiter().GetResult();
            }
            catch (Exception e)
            {
                Console.WriteLine(e.ToString());
                return 1;
            }
            return 0;
        }
    }
}