1
votes

I have a simple IObservable that provides NMEA strings from a serial device:

     var source = Observable.Create<string>(
            observer =>
            {
                var port = new SerialPort("COM4", 4800, Parity.None, 8, StopBits.One);

                string output = string.Empty;

                port.DataReceived += (sender, eventArgs) =>
                {
                    var input = (SerialPort)sender;
                    var buffer = new byte[input.BytesToRead];

                    try
                    {
                        input.Read(buffer, 0, buffer.Length);
                    }
                    catch (Exception exception)
                    {
                        observer.OnError(exception);
                    }

                    var encoding = Encoding.ASCII;
                    var data = encoding.GetString(buffer);

                    if (data.StartsWith("$") && output != string.Empty)
                    {
                        if (output.StartsWith("$") || data.EndsWith(Environment.NewLine))
                        {
                            output = output.TrimEnd(Environment.NewLine.ToCharArray());
                            observer.OnNext(output);
                        }

                        output = data;
                    }
                    else if ((output == string.Empty || data.StartsWith("$")) || output.StartsWith("$"))
                    {
                        output += data;
                    }
                };

                try
                {
                    port.Open();
                }
                catch (Exception error)
                {
                    observer.OnError(error);
                }

                return port;
            }
        );

Now I want to parse these strings to concrete message types that can be filtered later on. Thus the IObservable needs to be more specialized as IObservable.

This can be solved by .Select() statements where the Result is reprojected into NMEAMessage... (the following is just an example for a simple reprojection)

var selected = source.Select(s => { return new NmeaMessage(s); });

... but what happens if a reprojection can't be made (e.g. unknown message type or parse error for the delivered string). How to handle that? I can't call OnError here (because it is an Observable not an Observer. Just suppress the parse error and return nothing in this case? How to state that the source may be not a valid NMEA source? Should I create a virtual "Device" class (that uses the string source inside) instead of cascading or filtering Observables? The "Device" class could use events and then an Observable on top (Observable.FromEventPattern<>) could be created again.

I also want the "observing" Parser to be able to subscribe to different sources of IObservable. What is the best way to integrate the parser and reprojection into this scenario?

1

1 Answers

3
votes

Wrap your NmeaMessage constructor in a static function like this:

public static NmeaMessage TryParseNmeaMessage(TInputData d)
{
    if (IsValidInput(d))
        return new NmeaMessage(d);
    else
        return null;
}

Then you can do something like this:

var inputData = Observable.Create(...);
var parsed    = inputData.Select(d => TryParseNmeaMessage(d))
                         .Where(d => d != null);

Obviously you need to define IsValidInput() too.

Instead of null you could also return some BadNmeaMessage (subclass of NmeaMessage containing error info) from ParseMessage.
Then you can react on the good/bad messages separately using .OfType<(Bad)NmeaMessage>().

Or you could signal OnError (throw inside ParseMessage) and then restart the sequence.

See here for advanced error handling.

There is an excellent article about this sort of error handling in F#: Railway oriented programming.
The same principles can be used in C# with RX.