1
votes

How does one add message to service bus, if I already have a message in json format. I am able to add message using azure function output binding, but none of the message properties are seen in servicebusexplorer or queueexplorer.

I need to resubmit about 1K of messages, there was an error on the messages, so I exported them to file, fixed it in notepad++ and now I created a azure function that reads the file and puts it in the queue. But when I look at the message none of the message properties show up in servicebusexploerer.

run.csx

#r "Newtonsoft.Json"

using System.Net;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Primitives;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Threading.Tasks;
using System.Configuration;

const string QueueName = "commands";
static string FileName = "messages.json";

public static async Task<string> Run(HttpRequest req, ILogger log,
                 ExecutionContext context, ICollector<string> outputSbQueue)
{
    log.LogInformation("Starting processing messages.");

    var filePath = System.IO.Path.Combine(context.FunctionDirectory, FileName);

    log.LogInformation("Path: " + filePath);

    var text = File.ReadAllText(filePath);

    log.LogInformation("Message: " + text);

    JArray messages = JArray.Parse(text);

    log.LogInformation("Number of message: " + messages.Count);

    await SendMessagesAsync(messages,log,outputSbQueue);
    // return req.CreateResponse(HttpStatusCode.OK,
    //                             "Updated",
    //                             "text/plain");
    return "test";
}

static async Task SendMessagesAsync(JArray messages, ILogger log, 
ICollector<string> outputSbQueue )
{
    log.LogInformation("About to iterate messages");

    foreach (var message in messages)
    {
        log.LogInformation("Sending Message");
        outputSbQueue.Add(message.ToString());
        log.LogInformation("Sent message: " + message);
    }
}

messages.json

[
  {
    "Body": {
      "PaymentPlanId": "2141110b-07da-46b7-a166-ffc7f9f6c5af",
      "InstallmentId": "3bd27b0d-3372-456c-856c-74e09de1413a",
      "Date": "2018-12-05T00:00:00",
      "Amount": 66.89,
      "Attempt": 0,
      "PaymentCorrelationId": "2ae7511e-706f-4d7f-b44b-9690d0fcbf38",
      "CommandId": "a2d5ae26-6289-4cca-bce0-7a1905b64378"
    },
    "ContentType": "text/plain",
    "CorrelationId": null,
    "DeadLetterSource": "commands",
    "DeliveryCount": 1,
    "EnqueuedSequenceNumber": 14684,
    "EnqueuedTimeUtc": "2018-12-06T13:22:37.131Z",
    "ExpiresAtUtc": "9999-12-31T23:59:59.9999999",
    "ForcePersistence": false,
    "IsBodyConsumed": false,
    "Label": "PayDueInstallmentCommand",
    "LockedUntilUtc": null,
    "LockToken": null,
    "MessageId": "a2d5ae26-6289-4cca-bce0-7a1905b64378",
    "PartitionKey": null,
    "Properties": {
      "BodyClrType": "SR.Domain.Commands.PayDueInstallmentCommand, SR.Domain, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null",
      "ParentId": "|Dz4Pxv65XMA=.3975a8a2_32.",
      "RootId": "Dz4Pxv65XMA=",
      "Diagnostic-Id": "|Dz4Pxv65XMA=.3975a8a2_32.1.",
      "DeadLetterReason": "NoCommandInMessage",
      "DeadLetterErrorDescription": "There was no command in the message.",
      "Test":"1"
    },
    "ReplyTo": null,
    "ReplyToSessionId": null,
    "ScheduledEnqueueTimeUtc": "2018-12-06T13:22:36.877Z",
    "SequenceNumber": 14684,
    "SessionId": null,
    "Size": 938,
    "State": 0,
    "TimeToLive": "10675199.02:48:05.4775807",
    "To": null,
    "ViaPartitionKey": null
  }
 ]

function.json

{
  "bindings": [
    {
      "authLevel": "function",
      "name": "req",
      "type": "httpTrigger",
      "direction": "in",
      "methods": [
        "get",
        "post"
      ]
    },
    {
      "name": "$return",
      "type": "http",
      "direction": "out"
    },
    {
      "name": "outputSbQueue",
      "type": "serviceBus",
      "queueName": "deadletter",
      "connection": "ServiceBusConnectionString",
      "direction": "out"
    }
  ],
  "disabled": false
}
1
When you use a ICollector<string> the string corresponds to the message body. you'd better use a ICollector<BrokeredMessage> in your caseThomas
@Thomas: How can I do that, when I cannot add reference to Microsoft.Azure.ServiceBus or Microsoft.ServiceBus. I actualy tried using Microsoft.Azure.ServiceBus.Message but since I cannot add reference to above, it fails.epitka
Are you using in-portal function ?Thomas
@epitka add the following assembly references #r "..\\bin\\Microsoft.Azure.ServiceBus.dll" #r "Microsoft.Azure.WebJobs.ServiceBus"Roman Kiss
@epitka Does my solution work or do you need further help?Jerry Liu

1 Answers

0
votes

The assembly problem is easy to fix as @Roman has mentioned. Since you have created a v2 function(default if you haven't modified runtime version of new Function app), add command below.

#r "..\\bin\\Microsoft.Azure.ServiceBus.dll" 
using Microsoft.Azure.ServiceBus;

Another problem is the structure of your JSON model. It's actually based on BrokeredMessage in Microsoft.ServiceBus.Messaging instead of Message in Microsoft.Azure.ServiceBus. You may have to decide which one to use and refactor Json if necessary. Note that some of the properties are set by Azure Service Bus Service, which we can't modify in new created messages.

Make an example of Message. Refactor the JSON according to Message Class, including all properties configurable.

[
  {
    "Body": {
      "PaymentPlanId": "2141110b-07da-46b7-a166-ffc7f9f6c5af",
      ...
    },
    "ContentType": "text/plain",
    "Label": "MyLable",
    "MessageId": "a2d5ae26-6289-4cca-bce0-7a1905b64378",
    "ScheduledEnqueueTimeUtc": "2018-12-06T13:22:36.877Z",
    "TimeToLive": "10675199.02:48:05.4775807",
    "CorrelationId": null,
    "PartitionKey": null,
    "ReplyTo": null,
    "ReplyToSessionId": null,
    "SessionId": null,
    "To": null,
    "ViaPartitionKey": null
    "UserProperties": {
        "CustomProperty":"test",
        ...
    }
  }
 ]

We can't directly use Deserialization like JsonConvert.DeserializeObject as the message body requires byte[].

        foreach (var message in messages)
        {
            // Get Body first
            var body = System.Text.Encoding.UTF8.GetBytes(message["Body"].ToString());
            // Empty Body section to deserialize properties
            message["Body"] = "";
            var SBMessage = JsonConvert.DeserializeObject<Message>(message.ToString());
            SBMessage.Body = body;
            log.LogInformation("Sending Message");
            outputSbQueue.Add(SBMessage);
            log.LogInformation("Sent message: " + SBMessage.MessageId);
        }