28
votes

I'm trying to find a way to pass objects to the Azure Queue. I couldn't find a way to do this.

As I've seen I can pass string or byte array, which is not very comfortable for passing objects.

Is there anyway to pass custom objects to the Queue?

Thanks!

6

6 Answers

31
votes

You can use the following classes as example:

 [Serializable]
    public abstract class BaseMessage
    {
        public byte[] ToBinary()
        {
            BinaryFormatter bf = new BinaryFormatter();
            byte[] output = null;
            using (MemoryStream ms = new MemoryStream())
            {
                ms.Position = 0;
                bf.Serialize(ms, this);
                output = ms.GetBuffer();
            }
            return output;
        }

        public static T FromMessage<T>(CloudQueueMessage m)
        {
            byte[] buffer = m.AsBytes;
            T returnValue = default(T);
            using (MemoryStream ms = new MemoryStream(buffer))
            {
                ms.Position = 0;
                BinaryFormatter bf = new BinaryFormatter();
                returnValue = (T)bf.Deserialize(ms);
            }
            return returnValue;
        }
    }

Then a StdQueue (a Queue that is strongly typed):

   public class StdQueue<T> where T : BaseMessage, new()
    {
        protected CloudQueue queue;

        public StdQueue(CloudQueue queue)
        {
            this.queue = queue;
        }

        public void AddMessage(T message)
        {
            CloudQueueMessage msg =
            new CloudQueueMessage(message.ToBinary());
            queue.AddMessage(msg);
        }

        public void DeleteMessage(CloudQueueMessage msg)
        {
            queue.DeleteMessage(msg);
        }

        public CloudQueueMessage GetMessage()
        {
            return queue.GetMessage(TimeSpan.FromSeconds(120));
        }
    }

Then, all you have to do is to inherit the BaseMessage:

[Serializable]
public class ParseTaskMessage : BaseMessage
{
    public Guid TaskId { get; set; }

    public string BlobReferenceString { get; set; }

    public DateTime TimeRequested { get; set; }
}

And make a queue that works with that message:

CloudStorageAccount acc;
            if (!CloudStorageAccount.TryParse(connectionString, out acc))
            {
                throw new ArgumentOutOfRangeException("connectionString", "Invalid connection string was introduced!");
            }
            CloudQueueClient clnt = acc.CreateCloudQueueClient();
            CloudQueue queue = clnt.GetQueueReference(processQueue);
            queue.CreateIfNotExist();
            this._queue = new StdQueue<ParseTaskMessage>(queue);

Hope this helps!

20
votes

Extension method that uses Newtonsoft.Json and async

    public static async Task AddMessageAsJsonAsync<T>(this CloudQueue cloudQueue, T objectToAdd)
    {
        var messageAsJson = JsonConvert.SerializeObject(objectToAdd);
        var cloudQueueMessage = new CloudQueueMessage(messageAsJson);
        await cloudQueue.AddMessageAsync(cloudQueueMessage);
    }
9
votes

I like this generalization approach but I don't like having to put Serialize attribute on all the classes I might want to put in a message and derived them from a base (I might already have a base class too) so I used...

using System;
using System.Text;
using Microsoft.WindowsAzure.Storage.Queue;
using Newtonsoft.Json;

namespace Example.Queue
{
    public static class CloudQueueMessageExtensions
    {
        public static CloudQueueMessage Serialize(Object o)
        {
            var stringBuilder = new StringBuilder();
            stringBuilder.Append(o.GetType().FullName);
            stringBuilder.Append(':');
            stringBuilder.Append(JsonConvert.SerializeObject(o));
            return new CloudQueueMessage(stringBuilder.ToString());
        }

        public static T Deserialize<T>(this CloudQueueMessage m)
        {
            int indexOf = m.AsString.IndexOf(':');

            if (indexOf <= 0)
                throw new Exception(string.Format("Cannot deserialize into object of type {0}", 
                    typeof (T).FullName));

            string typeName = m.AsString.Substring(0, indexOf);
            string json = m.AsString.Substring(indexOf + 1);

            if (typeName != typeof (T).FullName)
            {
                throw new Exception(string.Format("Cannot deserialize object of type {0} into one of type {1}", 
                    typeName,
                    typeof (T).FullName));
            }

            return JsonConvert.DeserializeObject<T>(json);
        }
    }
}

e.g.

var myobject = new MyObject();
_queue.AddMessage( CloudQueueMessageExtensions.Serialize(myobject));

var myobject = _queue.GetMessage().Deserialize<MyObject>();
0
votes

In case the storage queue is used with WebJob or Azure function (quite common scenario) then the current Azure SDK allows to use POCO object directly. See examples here:

Note: The SDK will automatically use Newtonsoft.Json for serialization/deserialization under the hood.

0
votes

I liked @Akodo_Shado's approach to serialize with Newtonsoft.Json. I updated it for Azure.Storage.Queues and also added a "Retrieve and Delete" method that deserializes the object from the queue.

public static class CloudQueueExtensions
{
    public static async Task AddMessageAsJsonAsync<T>(this QueueClient queueClient, T objectToAdd) where T : class
    {
        string messageAsJson = JsonConvert.SerializeObject(objectToAdd);
        BinaryData cloudQueueMessage = new BinaryData(messageAsJson);
        await queueClient.SendMessageAsync(cloudQueueMessage);
    }

    public static async Task<T> RetreiveAndDeleteMessageAsObjectAsync<T>(this QueueClient queueClient) where T : class
    {

        QueueMessage[] retrievedMessage = await queueClient.ReceiveMessagesAsync(1);
        if (retrievedMessage.Length == 0) return null;
        string theMessage = retrievedMessage[0].MessageText;
        T instanceOfT = JsonConvert.DeserializeObject<T>(theMessage);
        await queueClient.DeleteMessageAsync(retrievedMessage[0].MessageId, retrievedMessage[0].PopReceipt);

        return instanceOfT;
    }
}

The RetreiveAndDeleteMessageAsObjectAsync is designed to process 1 message at time, but you could obviously rewrite to deserialize the full array of messages and return a ICollection<T> or similar.

-12
votes

That is not right way to do it. queues are not ment for storing object. you need to put object in blob or table (serialized). I believe queue messgae body has 64kb size limit with sdk1.5 and 8kb wih lower versions. Messgae body is ment to transfer crutial data for workera that pick it up only.