6
votes

I'm trying to make an IObservable<bool> that returns true if a UDP Message has been received in the last 5 seconds and if a timeout occurs, a false is returned.

So far I have this:

public IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP)
{
    var udp = BaseComms.UDPBaseStringListener(localEP)
        .Where(msg => msg.Data.Contains("running"))
        .Select(s => true);

    return Observable
        .Timeout(udp, TimeSpan.FromSeconds(5))
        .Catch(Observable.Return(false));
}

The issues with this are:-

  • Once a false is returned, the sequence stops
  • I only really need true or false on state changes.

I could use a Subject<T> but I need to be careful to dispose of the UDPBaseStringListener observable when there are no more subscribers.

Update

Every time I get a UDP message I would like it to return a true. If I haven't received a UDP message in the last 5 seconds, I would like it to return a false.

5
FYI, Timeout has an overload that takes an alternate observable for when the time timeout occurs rather than "throwing" and needing Catch. - Gideon Engelberth
Readers may also be interested in 1, 2, and 3. - Whymarrh

5 Answers

3
votes

As pointed out by Bj0, the solution with BufferWithTime will not return the data point as soon as it is received and the buffer timeout is not reset after receiving a data point.

With Rx Extensions 2.0, your can solve both problems with a new Buffer overload accepting both a timeout and a size:

static IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP)
{
    return BaseComms
        .UDPBaseStringListener(localEP)
        .Where(msg => msg.Data.Contains("running"))
        .Buffer(TimeSpan.FromSeconds(5), 1)
        .Select(s => s.Count > 0)
        .DistinctUntilChanged();
}
2
votes

The problem with buffer is that the "timeout" interval doesn't get reset when you get a new value, the buffer windows are just slices of time (5s in this case) that follow each other. This means that, depending on when you receive your last value, you may have to wait for almost double the timeout value. This can also miss timeouts:

               should timeout here
                         v
0s         5s         10s        15s
|x - x - x | x - - - - | - - - x -| ...
          true        true       true

IObservable.Throttle, however, resets itself each time a new value comes in and only produces a value after the timespan has elapsed (the last incoming value). This can be used as a timeout and merged with the IObservable to insert "timeout" values into the stream:

var obs = BaseComms.UDPBaseStringListener(localEP)
            .Where(msg => msg.Data.Contains("running"));

return obs.Merge( obs.Throttle( TimeSpan.FromSeconds(5) )
                        .Select( x => false ) )
            .DistinctUntilChanged();

A working LINQPad example:

var sub = new Subject<int>();

var script = sub.Timestamp()
    .Merge( sub.Throttle(TimeSpan.FromSeconds(2)).Select( i => -1).Timestamp())
    .Subscribe( x => 
{
    x.Dump("val");
});


Thread.Sleep(1000);

sub.OnNext(1);
sub.OnNext(2);

Thread.Sleep(10000);

sub.OnNext(5);

A -1 is inserted into the stream after a 2s timeout.

1
votes

I would suggest avoiding the use of Timeout - it causes exceptions and coding with exceptions is bad.

Also, it seems to only make sense that your observable stops after one value. You might need to explain more as to what you want the behaviour to be.

My current solution to your problem is:

public IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP)
{
    return Observable.Create<bool>(o =>
    {
        var subject = new AsyncSubject<bool>();
        return new CompositeDisposable(
            Observable.Amb(
                BaseComms
                    .UDPBaseStringListener(localEP)
                    .Where(msg => msg.Data.Contains("running"))
                    .Select(s => true),
                Observable
                    .Timer(TimeSpan.FromMilliseconds(10.0))
                    .Select(_ => false)
            ).Take(1).Subscribe(subject), subject.Subscribe(o));
    });
}

Does that help?

1
votes

If you don't want the sequence to stop, just wrap it in Defer + Repeat:

Observable.Defer(() => GettingUDPMessages(endpoint)
    .Repeat();
0
votes

Based on the answer here, I have created the following extension method:

/// <summary>
/// Emits a value every interval until a value is emitted on the TakeUntil observable. The TakeUntil and Repeat
/// operators work together to create a resetting timer that is reset every time a value is emitted on the
/// TakeUntil observable.
/// </summary>
/// <param name="observable">The observable to observe for timeouts</param>
/// <param name="timeout">The value to use to emit a timeout value</param>
/// <param name="timeoutValue">The value to emit when the observable times out</param>
public static IObservable<T> SelectTimeout<T>(this IObservable<T> observable, TimeSpan timeout, T timeoutValue)
{
    IObservable<T> timeoutObservable = Observable
        .Interval(timeout)
        .Select(_ => timeoutValue)
        .TakeUntil(observable)
        .Repeat();

    return observable.Merge(timeoutObservable);
}

You will need the following using directives:

using System;
using System.Reactive.Linq;

Note that the timeout value is emitted after every timeout. If you wish to emit only one timeout value you could either use the DistinctUntilChanged method or look at the linked answer. I preferred the constantly emitting timeout since it is simpler and easier to understand.