3
votes

I am using RX to query the events that come out of an automation device, which has buttons connected to it. I want to be able to tell the difference when a user has just pressed and immediately released the button, or if he is holding the button down for some time. I'm having a problem with the query I use to see if he's holding the button down.

The basic idea is that once the button has been pressed, I would produce a value every half second, until the button was released again. I am also timestamping each value, so that I know how long the button has been pressed.

Here's my code, and how I think this should work:

public IObservable<DigitalInputHeldInfo> Adapt(IObservable<EventPattern<AdsNotificationEventArgs>> messageStream) {
  var startObservable = 
    _digitalInputPressedEventAdapter.Adapt(messageStream);
  var endObservable = 
    _digitalInputReleasedEventAdapter.Adapt(messageStream);

  return from notification in startObservable.Timestamp()

    from interval in 
        Observable.
        Interval(
            500.Milliseconds(),
            _schedulerProvider.ThreadPool).
        Timestamp().
        TakeUntil(endObservable)

    select new DigitalInputHeldInfo(
            interval.Timestamp.Subtract(notification.Timestamp),
            notification.Value);
}

From a given IObservable, I am applying a query on it so that I have an observable startObservable which produces a value everytime the button is pressed (state goes from false to true). I also have an observable endObservable, queried from the same source observable, which produces a value when the button is released again (state goes from true to false). When startObservable produces a value, I start an observable interval at every 500 milliseconds. I timestamp these values, and I take from it until endObservable produces a value. I then return an object holding the value of the startObservable and how long is has been held up until now.

I have a unit test for this where I hold the button for 2 seconds (using the TestScheduler), release it and then let the scheduler go on for a couple more seconds. I do this to make sure that no more values are produced after the button was released.

This is the point where my test fails, and the topic of my question. In my test, I am expecting 4 values to be produced (after 0.5s, 1s, 1.5s and 2s). However, there are still events being produced after the button was released, even though I am using TakeUntil(endObservable) (the digitalInputReleasedEventAdapter that produces the endObservable has its own set of tests, and I'm confident that it works the way it should).

I don't think I've constructed my query incorrectly. I suspect it might have something to do with hot vs. cold observables. But as I'm just starting out with RX, I don't quite have a full understanding of how that might relate to the issue I'm having here.

4
It seems like you're doing it in quite a complex way. Why not just measure the the time between the two events - Dave Hillier
I'm doing it this way because I can't know how long the button will be held down. So the event when the button is released might come after 1 or 1000 seconds. In the meantime, I still want to get intermediate events that tell me how long it has been held down, so the application can react differently. We may do something when the button was pressed for 2 seconds, and then do something else 5 seconds later if it is still being held down. Anyway, I am still investigating it, and it seems like my query is doing what it is supposed to do. I'll post updates here in the next couple of days. - FleaFX
If you have an event with a timestamp when it is started then you can calculate the duration from the current clock without and extra events. - Dave Hillier

4 Answers

1
votes

Not sure if this does exactly what you're after, but it's "yet another approach":

void Main()
{
    // ButtonPush returns IObservable<bool>
    var buttonPusher = ButtonPush();
    var pushTracker = 
        from pushOn in buttonPusher.Where(p => p).Timestamp()
        let tickWindow = Observable.Interval(TimeSpan.FromMilliseconds(500))
        from tick in tickWindow
            .TakeUntil(buttonPusher.Where(p => !p))
            .Timestamp()
            .Select(t => t.Timestamp)
        select tick;

    Console.WriteLine("Start: {0}", Observable.Return(true).Timestamp().First().Timestamp);
    var s = pushTracker
        .SubscribeOn(NewThreadScheduler.Default)
        .Subscribe(x => Console.WriteLine("tick:{0}", x));
}

IObservable<bool> ButtonPush()
{
    var push = new BehaviorSubject<bool>(false);    
    var rnd = new Random();
    Task.Factory.StartNew(
        () => 
        {
            // "press button" after random # of seconds
            Thread.Sleep(rnd.Next(2, 5) * 1000);
            push.OnNext(true);
            // and stop after another random # of seconds
            Thread.Sleep(rnd.Next(3, 10) * 1000);
            push.OnNext(false);
        });
    return push;
}
0
votes

It sounds like what you want is to only receive time events while the button is down. CombineLatest should help you here.

For example:

using Microsoft.Reactive.Testing;
using System;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace ButtonTest
{
    class Program
    {
        enum State
        {
            KeyDown, KeyUp
        }

        static void Main(string[] args)
        {
            var buttonState = new BehaviorSubject<State>(State.KeyUp);
            var testScheduler = new TestScheduler();
            var events = testScheduler.CreateObserver<long>();

            Observable.Interval(TimeSpan.FromTicks(100), testScheduler)
                .CombineLatest(buttonState, (t,s)=> new { TimeStamp = t, ButtonState = s })
                .Where(t => t.ButtonState == State.KeyDown)
                .Select(t => t.TimeStamp)
                .Subscribe(events);

            testScheduler.AdvanceBy(100);//t=0
            testScheduler.AdvanceBy(100);//t=1

            buttonState.OnNext(State.KeyDown);
            testScheduler.AdvanceBy(100);//t=2

            testScheduler.AdvanceBy(100);//t=3
            buttonState.OnNext(State.KeyUp);

            testScheduler.AdvanceBy(100);//t=4
            testScheduler.AdvanceBy(100);//t=5

            buttonState.OnNext(State.KeyDown);
            testScheduler.AdvanceBy(100);//t=6

            buttonState.OnNext(State.KeyUp);

            testScheduler.AdvanceBy(100);//t=7
            testScheduler.AdvanceBy(100);//t=8

            Debug.Assert(events.Messages.Count == 5);
            Debug.Assert(events.Messages[0].Value.Value == 1);
            Debug.Assert(events.Messages[1].Value.Value == 2);
            Debug.Assert(events.Messages[2].Value.Value == 3);
            Debug.Assert(events.Messages[3].Value.Value == 5);
            Debug.Assert(events.Messages[4].Value.Value == 6);
        }
    }
}
-1
votes

Not sure if this is the bug, but your TimeStamp call isn't incorporating the test scheduler. Check all operators that take an IScheduler parameter and make sure they're passing in the test scheduler.

-1
votes

I have found a solution.

public IObservable<DigitalInputHeldInfo> Adapt(
  IObservable<EventPattern<AdsNotificationEventArgs>> messageStream) {

  var startObservable = _digitalInputPressedEventAdapter.
      Adapt(messageStream).
      Publish();
  var endObservable = _digitalInputReleasedEventAdapter.
      Adapt(messageStream).
      Publish();

  startObservable.Connect();
  endObservable.Connect();

  return from notification in startObservable.Timestamp()
         from interval in Observable.Interval(500.Milliseconds(), 
                                              _schedulerProvider.ThreadPool).
                          Timestamp().
                          TakeUntil(endObservable)
         select new DigitalInputHeldInfo(
                interval.Timestamp.Subtract(notification.Timestamp), 
                notification.Value);
}

I isolated this code into a console application, and constructed the source observable (here called messageStream) from an IEnumerable<> which yielded some true and false values. I saw that this IEnumerable<> was generated several times, so there must have been several threads started. I believe the produced values were being consumed by separate instances of Observable.Interval, and that these were racing against each other to receive the messages indicating the button release. So now I'm calling Publish() and Connect() on the startObservable and endObservable, and so the Observable.Interval instances share the same subscription.