2
votes

Given 2 hot observables t1 and t2 how would I GoupJoin so that I get all events from t2 that occur x seconds before and y seconds after each event in t1?

Given:

t1 -----A-----B-----C

t2 --1--2--3--4--5--6

If t1 are 2 seconds apart and t2 is one second apart and we are looking for t2 events that are 1 second either side of each t1 event, following would be the result.

Result:

{ A, [1,2,3] }

{ B, [3,4,5] }

{ C, [5,6] }

Following is the real example, where we need the solution for above problem. We have a stream of emails and another stream of text messages. We need to emit another result of stream which has email and the text messages occurred within before or after 1 minute of email sent time.

2

2 Answers

1
votes

The problem here (as Shlomo mentioned) is that we need to open the window in t2 BEFORE the t1 event occurs. Unfortunately this isn't possible because once we reach the event in t1 we're already past the point we need to open the window in t2.

What we can do instead is to shift t2 forwards in time using Delay(). If we offset it by x (the before time) we can reframe the question as "get the events in t2 that occur in the window opening at t1 and closing at t1 + x + y. We can use a GroupJoin to solve that.

var scheduler = new HistoricalScheduler();

var t1 = Observable.Interval(TimeSpan.FromMilliseconds(200), scheduler)
    .Select(l => (char)('A' + l));
var t2 = Observable.Interval(TimeSpan.FromMilliseconds(100), scheduler);

var x = TimeSpan.FromMilliseconds(100);     //before time
var y = TimeSpan.FromMilliseconds(100);     //after time

var delayedT2 = t2.Delay(x, scheduler);

var g = t1.GroupJoin(delayedT2 ,
    _ => Observable.Timer(x + y, scheduler),
    _ => Observable.Empty<Unit>(scheduler),
    (a, b) => new { a, b}
);

scheduler.Start();

This gives the result:

{ A, [1,2] }
{ B, [3,4] }
{ C, [5,6] }

This result still isn't quite what you were expecting. This because in your example t2 events are occurring at exactly the same instant t1 events. In this case the t1 + y event is processed first and closes the window before the t2 event can be included. This means we are effectively getting (t1-01:00) <= t1 < (t1 + 01:00). E.g. The window for A is 01:0000 - 02.9999... which is why 3 occurring at 03:00 is not included.

This can be fixed to be inclusive by simply adding a single tick to our y time

var y = TimeSpan.FromMilliseconds(100).Add(TimeSpan.FromTicks(1)); 
1
votes

Code dump answer (using 100 milliseconds as a substitute for 1 second):

var t1 = Observable.Interval(TimeSpan.FromMilliseconds(200))
    .Select(l => (char)('A' + l))
    .Delay(TimeSpan.FromMilliseconds(200));
var t2 = Observable.Interval(TimeSpan.FromMilliseconds(100))
    .Delay(TimeSpan.FromMilliseconds(100));

var x = TimeSpan.FromMilliseconds(100);     //before time
var y = TimeSpan.FromMilliseconds(100);     //after time

var g = t1.Timestamp().Join(t2.Timestamp(),
    c => Observable.Timer(y),
    i => Observable.Timer(x + y),
    (c, i) => new {GroupItem = c, RightItem = i}
)
    .Where(a =>
        (a.GroupItem.Timestamp > a.RightItem.Timestamp && a.GroupItem.Timestamp - a.RightItem.Timestamp <= x) //group-item came first
        || (a.GroupItem.Timestamp <= a.RightItem.Timestamp && a.RightItem.Timestamp - a.GroupItem.Timestamp <= y) // right-item came first, or exact timestamp match
    )
    .Select(a => new { GroupItem = a.GroupItem.Value, RightItem = a.RightItem.Value })
    .GroupBy(a => a.GroupItem, a => a.RightItem);

Explanation: Join is all about "windows". So when you define a join, you have to think about the window of time that is open for each item from the left observable and right observable. Our window here is hard to figure out though: We have to somehow open a window for the left observable X time before it occurs, then shut it Y time after it occurs.

Rather than do the impossible, so we leave it only open for Y time after a left item occurs, and let the right-item windows be defined by X + Y time. However, this will leave us with items that shouldn't be included. So we use a Where on the Timestamps to filter those out.

Finally we select out the anonymous types and timestamps and group it all together.

I don't think GroupJoin is the way to go here: You would end up taking apart the group and reconstituting it similar to what I've done..