I am using RX to query the events that come out of an automation device, which has buttons connected to it. I want to be able to tell the difference when a user has just pressed and immediately released the button, or if he is holding the button down for some time. I'm having a problem with the query I use to see if he's holding the button down.
The basic idea is that once the button has been pressed, I would produce a value every half second, until the button was released again. I am also timestamping each value, so that I know how long the button has been pressed.
Here's my code, and how I think this should work:
public IObservable<DigitalInputHeldInfo> Adapt(IObservable<EventPattern<AdsNotificationEventArgs>> messageStream) {
var startObservable =
_digitalInputPressedEventAdapter.Adapt(messageStream);
var endObservable =
_digitalInputReleasedEventAdapter.Adapt(messageStream);
return from notification in startObservable.Timestamp()
from interval in
Observable.
Interval(
500.Milliseconds(),
_schedulerProvider.ThreadPool).
Timestamp().
TakeUntil(endObservable)
select new DigitalInputHeldInfo(
interval.Timestamp.Subtract(notification.Timestamp),
notification.Value);
}
From a given IObservable, I am applying a query on it so that I have an observable startObservable which produces a value everytime the button is pressed (state goes from false to true). I also have an observable endObservable, queried from the same source observable, which produces a value when the button is released again (state goes from true to false). When startObservable produces a value, I start an observable interval at every 500 milliseconds. I timestamp these values, and I take from it until endObservable produces a value. I then return an object holding the value of the startObservable and how long is has been held up until now.
I have a unit test for this where I hold the button for 2 seconds (using the TestScheduler), release it and then let the scheduler go on for a couple more seconds. I do this to make sure that no more values are produced after the button was released.
This is the point where my test fails, and the topic of my question. In my test, I am expecting 4 values to be produced (after 0.5s, 1s, 1.5s and 2s). However, there are still events being produced after the button was released, even though I am using TakeUntil(endObservable) (the digitalInputReleasedEventAdapter that produces the endObservable has its own set of tests, and I'm confident that it works the way it should).
I don't think I've constructed my query incorrectly. I suspect it might have something to do with hot vs. cold observables. But as I'm just starting out with RX, I don't quite have a full understanding of how that might relate to the issue I'm having here.