5
votes

We are developing a WCF service for streaming a large amount of data, therefore we have chosen to use WCF Streaming functionality combined with a protobuf-net serialization.

Context:

Generally an idea is to serialize objects in the service, write them into a stream and send. On the other end the caller will receive a Stream object and it can read all data.

So currently the service method code looks somewhat like this:

public Result TestMethod(Parameter parameter)
{
    // Create response
    var responseObject = new BusinessResponse { Value = "some very large data"};

    // The resposne have to be serialized in advance to intermediate MemoryStream
    var stream = new MemoryStream();
    serializer.Serialize(stream, responseObject);
    stream.Position = 0;

    // ResultBody is a stream, Result is a MessageContract
    return new Result {ResultBody = stream};
}

The BusinessResponse object is serialized to a MemoryStream and that is returned from a method. On the client side the calling code looks like that:

var parameter = new Parameter();

// Call the service method
var methodResult = channel.TestMethod(parameter);

// protobuf-net deserializer reads from a stream received from a service.
// while reading is performed by protobuf-net, 
// on the service side WCF is actually reading from a 
// memory stream where serialized message is stored
var result = serializer.Deserialize<BusinessResponse>(methodResult.ResultBody);
return result;

So when serializer.Deserialize() is called it reads from a stream methodResult.ResultBody, on the same time on the service side WCF is reading a MemoryStream, that has been returned from a TestMethod.

Problem:

What we would like to achieve is to get rid of a MemoryStream and initial serialization of the whole object on the service side at once. Since we use streaming we would like to avoid keeping a serialized object in memory before sending.

Idea:

The perfect solution would be to return an empty, custom-made Stream object (from TestMethod()) with a reference to an object that is to be serialized ('BusinessResponse' object in my example). So when WCF calls a Read() method of my stream, I internally serialize a piece of an object using protobuf-net and return it to the caller without storing it in the memory.

And now there is a problem, because what we actually need is a possibility to serialize an object piece by piece in the moment when stream is read. I understand that this is totally different way of serialization - instead of pushing an object to a serializer, I'd like to request a serialized content piece by piece.

Is that kind of serialization is somehow possible using protobuf-net?

1
Is this one object? Or a series of objects (a collection)? Whether it is worth looking at this much actually depends on your WCF config - in most configurations it will always buffer the entire message in memory anyway - so it may be as easy to not change anything. - Marc Gravell♦
Hi Marc, WCF is configured to not use buffering at all - that is the point of streaming - I want to decrease memory footprint on server side. Additionally if I would like to serialize collection of objects I would use SerializeWithLengthPrefix() each time the Client calls Read() and my underlying buffer is smaller than requested data amount. The issue here is that I'd like to be able to split single object serialization. - Hubert R. Skrzypek
interesting question. I think this can be generalised, essentially to a spoof Stream that makes the Read and Write work as co-routines. If you don't mind having an extra Thread, it could be done with a simple gate, however iirc Jon had some interesting ideas. I'll have to take a look and get back to you. However, I can say without doubt that I don't intend hacking the core of protobuf-net for the purpose :) - Marc Gravell♦
Interesting question. By the way, considering the complexity of WCF and use of protobuf, this service isn't going to be interoperable by a client not being written exactly for this service. Isn't WCF unnecessary layer of abstraction in this case? I'm just asking, because I don't see all the benefits right now. Maybe the WS-* on the request part? - jhexp
@jhexp you're quite right, that protobuf makes the service less interoperable. However this is internal-use service, and there is a possibility to choose protobuf serialization or not, depending on client preference. - Hubert R. Skrzypek

1 Answers

2
votes

I cooked up some code that is probably along the lines of the gate idea of Marc.

public class PullStream : Stream
{
    private byte[] internalBuffer;
    private bool ended;
    private static ManualResetEvent dataAvailable = new ManualResetEvent(false);
    private static ManualResetEvent dataEmpty = new ManualResetEvent(true);

    public override bool CanRead
    {
        get { return true; }
    }

    public override bool CanSeek
    {
        get { return false; }
    }

    public override bool CanWrite
    {
        get { return true; }
    }

    public override void Flush()
    {
        throw new NotImplementedException();
    }

    public override long Length
    {
        get { throw new NotImplementedException(); }
    }

    public override long Position
    {
        get
        {
            throw new NotImplementedException();
        }
        set
        {
            throw new NotImplementedException();
        }
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        dataAvailable.WaitOne();
        if ( count >= internalBuffer.Length)
        {
            var retVal = internalBuffer.Length;
            Array.Copy(internalBuffer, buffer, retVal);
            internalBuffer = null;
            dataAvailable.Reset();
            dataEmpty.Set();
            return retVal;
        }
        else
        {
            Array.Copy(internalBuffer, buffer, count);
            internalBuffer = internalBuffer.Skip(count).ToArray(); // i know
            return count;
        }
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        throw new NotImplementedException();
    }

    public override void SetLength(long value)
    {
        throw new NotImplementedException();
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        dataEmpty.WaitOne();
        dataEmpty.Reset();

        internalBuffer = new byte[count];
        Array.Copy(buffer, internalBuffer, count);

        Debug.WriteLine("Writing some data");

        dataAvailable.Set();
    }

    public void End()
    {
        dataEmpty.WaitOne();
        dataEmpty.Reset();

        internalBuffer = new byte[0];

        Debug.WriteLine("Ending writes");

        dataAvailable.Set();
    }
}

This is a simple stream descendant class only implementing Read and Write (and End). The Read blocks while no data is available and the Write blocks while data is available. This way there is only one byte buffer involved. The linq copying of the rest is open for optimization ;-) The End method is added so no blocking occurs where Read is performed when no data is available and no data will be written any more.

You have to write to this stream from a separate thread. I show this below:

    // create a large object
    var obj = new List<ToSerialize>();
    for(int i = 0; i <= 1000; i ++)
        obj.Add(new ToSerialize { Test = "This is my very loooong message" });
    // create my special stream to read from
    var ms = new PullStream();
    new Thread(x =>
    {
        ProtoBuf.Serializer.Serialize(ms, obj);
        ms.End();
    }).Start();
    var buffer = new byte[100];
    // stream to write back to (just to show deserialization is working too)
    var ws = new MemoryStream();
    int read;
    while ((read = ms.Read(buffer, 0, 100)) != 0)
    {
        ws.Write(buffer, 0, read);
        Debug.WriteLine("read some data");
    }
    ws.Position = 0;
    var back = ProtoBuf.Serializer.Deserialize<List<ToSerialize>>(ws);

I hope this solves your problem :-) It was fun to code this anyway.

Regards, Jacco