I have a simple IObservable that provides NMEA strings from a serial device:
var source = Observable.Create<string>(
observer =>
{
var port = new SerialPort("COM4", 4800, Parity.None, 8, StopBits.One);
string output = string.Empty;
port.DataReceived += (sender, eventArgs) =>
{
var input = (SerialPort)sender;
var buffer = new byte[input.BytesToRead];
try
{
input.Read(buffer, 0, buffer.Length);
}
catch (Exception exception)
{
observer.OnError(exception);
}
var encoding = Encoding.ASCII;
var data = encoding.GetString(buffer);
if (data.StartsWith("$") && output != string.Empty)
{
if (output.StartsWith("$") || data.EndsWith(Environment.NewLine))
{
output = output.TrimEnd(Environment.NewLine.ToCharArray());
observer.OnNext(output);
}
output = data;
}
else if ((output == string.Empty || data.StartsWith("$")) || output.StartsWith("$"))
{
output += data;
}
};
try
{
port.Open();
}
catch (Exception error)
{
observer.OnError(error);
}
return port;
}
);
Now I want to parse these strings to concrete message types that can be filtered later on. Thus the IObservable needs to be more specialized as IObservable.
This can be solved by .Select() statements where the Result is reprojected into NMEAMessage... (the following is just an example for a simple reprojection)
var selected = source.Select(s => { return new NmeaMessage(s); });
... but what happens if a reprojection can't be made (e.g. unknown message type or parse error for the delivered string). How to handle that? I can't call OnError here (because it is an Observable not an Observer. Just suppress the parse error and return nothing in this case? How to state that the source may be not a valid NMEA source? Should I create a virtual "Device" class (that uses the string source inside) instead of cascading or filtering Observables? The "Device" class could use events and then an Observable on top (Observable.FromEventPattern<>) could be created again.
I also want the "observing" Parser to be able to subscribe to different sources of IObservable. What is the best way to integrate the parser and reprojection into this scenario?