5
votes

I'm currently working on an application that combines many streams of data through equations. What I'd like to be able to do is something like:

var result = (stream1 + stream2) / stream3 + stream4 * 2;

Where result updates whenever any of the streams update. At the moment the only way I can express this in Rx is as:

var result = stream1.CombineLatest(stream2, (x,y) => x+y)
  .CombineLatest(stream3, (x,y) => x / y)
  .CombineLatest(stream4, (x,y) => x + y*2);

Which isn't nearly as clear.

My current idea is as follows:

Public class ArithmeticStream : IObservable<double>
{
    public static ArithmeticStream operator +(ArithmeticStream xx, ArithmeticStream yy)
    {
        return Observable.CombineLatest(xx,yy, (x,y) => x + y);
    }
    ...
}

The problem is that CombineLatest returns an IObservable<double> instead of an ArithmeticStream.

Two possible questions:

How can I transparently convert an IObservable<double> into an ArithmeticStream?

Is there an alternative route that will get me the result I want?

4
Assuming stream1 is an ArithmeticStream, you should change 'var result =` to ArithmeticStream result =. - IAbstract
@IAbstract, no. That's is the code I'd like to write. My current attempt to get that line to work is by using ArithmeticStreams, but if someone has a way to do it with IObservable<double>s or something else then that'd work too. - Matthew Finlay

4 Answers

2
votes

Gonna add this as a new answer, since it's quite different...

So, if you are committed to doing that operator-overloading route, here's how you'd need to do it (well, one way, at least)...to be honest, tho, I'm not fond of this approach - while it does make the query writing far more terse, the DSL-approach has similar "terseness", but is far clearer in the sense that it doesn't rely on operator overloading.

public static class ArithmeticStreamExt
{
    public static ArithmeticStream Wrap(this IObservable<double> src)
    {
        return new ArithmeticStream(src);
    }
    public static ArithmeticStream Const(this double constValue)
    {
        return new ArithmeticStream(Observable.Return(constValue));
    }
}
public class ArithmeticStream 
{
    private IObservable<double> _inner;

    public ArithmeticStream(IObservable<double> source)
    {
        _inner = source;
    }

    public IObservable<double> Source {get { return _inner; }}

    public static ArithmeticStream operator +(
       ArithmeticStream left, 
       ArithmeticStream right)
    {
        return new ArithmeticStream(
            left._inner.CombineLatest(right._inner, (l, r) => l + r));
    }
    public static ArithmeticStream operator -(
       ArithmeticStream left, 
       ArithmeticStream right)
    {
        return new ArithmeticStream(
            left._inner.CombineLatest(right._inner, (l, r) => l - r));
    }
    public static ArithmeticStream operator *(
       ArithmeticStream left, 
       ArithmeticStream right)
    {
        return new ArithmeticStream(
            left._inner.CombineLatest(right._inner, (l, r) => l * r));
    }
    public static ArithmeticStream operator /(
       ArithmeticStream left, 
       ArithmeticStream right)
    {
        return new ArithmeticStream(
            left._inner.CombineLatest(right._inner, (l, r) => l / r));
    }

    public static ArithmeticStream operator +(
        ArithmeticStream left, 
        IObservable<double> right)
    {
        return new ArithmeticStream(
            left._inner.CombineLatest(right, (l, r) => l + r));
    }
    public static ArithmeticStream operator -(
        ArithmeticStream left, 
        IObservable<double> right)
    {
        return new ArithmeticStream(
            left._inner.CombineLatest(right, (l, r) => l - r));
    }
    public static ArithmeticStream operator *(
        ArithmeticStream left, 
        IObservable<double> right)
    {
        return new ArithmeticStream(
            left._inner.CombineLatest(right, (l, r) => l * r));
    }
    public static ArithmeticStream operator /(
        ArithmeticStream left, 
        IObservable<double> right)
    {
        return new ArithmeticStream(
             left._inner.CombineLatest(right, (l, r) => l / r));
    }
}

And a test rig:

void Main()
{
    var s1 = new Subject<double>();
    var s2 = new Subject<double>();
    var s3 = new Subject<double>();
    var s4 = new Subject<double>();

    var result = (s1.Wrap() + s2) / s3 + (s4.Wrap() * 2.0.Const());
    using(result.Source.Subscribe(Console.WriteLine))
    {
        s1.OnNext(1.0);
        s2.OnNext(2.0);
        s3.OnNext(3.0);
        s4.OnNext(4.0);
    }
}
2
votes

Hmm...I think you could do it DSL-style relatively easy (no fiddling about with operators):

public static class Ext
{
    public static IObservable<double> Const(this double constant)
    {
        return Observable.Return(constant);
    }

    public static IObservable<double> Plus(this IObservable<double> left, IObservable<double> right)
    {
        return left.CombineLatest(right, (l,r) => l + r);
    }
    public static IObservable<double> Minus(this IObservable<double> left, IObservable<double> right)
    {
        return left.CombineLatest(right, (l,r) => l - r);
    }
    public static IObservable<double> Times(this IObservable<double> left, IObservable<double> right)
    {
        return left.CombineLatest(right, (l,r) => l * r);
    }
    public static IObservable<double> Over(this IObservable<double> left, IObservable<double> right)
    {
        return left.CombineLatest(right,(l,r) => l / r);
    }
}

So your query would be:

var result = (s1.Plus(s2)).Over(s3)
        .Plus(s4)
        .Times(2.0.Const());

Or, for a very chatty variant:

var verboseResult = 
    (s1.Do(Console.WriteLine).Plus(s2.Do(Console.WriteLine)))
    .Over(s3.Do(Console.WriteLine))
    .Plus(s4.Do(Console.WriteLine))
    .Times(2.0.Const())
    .Do(x => Console.WriteLine("(s1 + s2) / s3 + s4 * 2 = " + x));
1
votes

Consider creating a versions of CombineLatest that take 3, 4, 5, etc IObservables and a result selector function with matching arity. This will let you express the arithmetic operation as a plain old operation on doubles - very simple and clean.

If you need help implementing those just say so and I'll give an example.

Edit

The CombineLatest overloads I'm referring to already exist and just aren't documented.

0
votes

This is not as nice as operator overloading, but I'm pretty sure you cannot do the kind of operator overloading you want, as there is no support for operator overloading in extension methods.

var stream1 = Observable.Generate(1.0, x => x < 1000.0, x => x + 0.5, x => x);
var stream2 = Observable.Generate(1.0, x => x < 1000.0, x => x + 0.25, x => x);
var stream3 = Observable.Generate(1.0, x => x < 1000.0, x => x + 0.125, x => x);
var stream4 = Observable.Generate(1.0, x => x < 1000.0, x => x + 0.0625, x => x);
var result = stream1.CombineLatest(stream2, stream3, stream4, (w, x, y, z) => (w + x) / y + z * 2);

Otherwise, JerKimball gave a nice answer.