0
votes

Here we have a Observable Sequence... in .NET using Rx.

var aSource = new Subject<int>();

var bSource = new Subject<int>();

var paired = Observable
            .Merge(aSource, bSource)
    .GroupBy(i => i).SelectMany(g => g.Buffer(2).Take(1));

paired.Subscribe(g => Console.WriteLine("{0}:{1}", g.ElementAt(0), g.ElementAt(1)));

aSource.OnNext(4);
bSource.OnNext(1);
aSource.OnNext(2);
bSource.OnNext(5);
aSource.OnNext(3);
bSource.OnNext(3);
aSource.OnNext(5);
bSource.OnNext(2);
aSource.OnNext(1);
bSource.OnNext(4);

Output: 3:3 5:5 2:2 1:1 4:4

We will get events every time a pair of numbers arrive with the same id.

Perfect! Just what i want.

Groups of two, paired by value.

Next question....

How to get a selectmany/buffer for sequences of values.

So 1,2,3,4,5 arrives at both aSource and bSource via OnNext(). Then fire ConsoleWriteLine() for 1-5. Then when 2,3,4,5,6 arrives, we get another console.writeline(). Any clues anyone?

Immediately, the Rx forum suggests looking at .Window()

http://introtorx.com/Content/v1.0.10621.0/17_SequencesOfCoincidence.html

Which on the surface looks perfect. In my case i need a window of value 4, in this case.

Where in the query sequence does it belong to get this effect?

var paired = Observable.Merge(aSource, bSource).GroupBy(i => i).SelectMany(g => g.Buffer(2).Take(1));

Output 1,2,3,4,5 : 1,2,3,4,5 2,3,4,5,6 : 2,3,4,5,6

Regards,

Daniel

2
For the second part, do the numbers arrive in order on each source? Or a random order? - James World
It can be random. They are the result of varying lenght "long" processes. - WebSight

2 Answers

1
votes

Assuming events arrive randomly at the sources, use my answer to "Reordering events with Reactive Extensions" to get the events in order.

Then use Observable.Buffer to create a sliding buffer:

// get this using the OrderedCollect/Sort in the referenced question
IObservable<int> orderedSource;

// then subscribe to this
orderedSource.Buffer(5, 1);
0
votes

Here is an extension method that fires when it has n inputs of the same ids.

public static class RxExtension
    {

        public static IObservable<TSource> MergeBuffer<TSource>(this IObservable<TSource> source, Func<TSource, int> keySelector, Func<IList<TSource>,TSource> mergeFunction, int bufferCount)
        {
            return Observable.Create<TSource>(o =>  {
                var buffer = new Dictionary<int, IList<TSource>>();
                return source.Subscribe<TSource>(i =>
                {
                    var index = keySelector(i);
                    if (buffer.ContainsKey(index))
                    {
                        buffer[index].Add(i);
                    }
                    else 
                    {
                        buffer.Add(index, new List<TSource>(){i});
                    }
                    if (buffer.Count==bufferCount)
                    {
                        o.OnNext(mergeFunction(buffer[index]));
                        buffer.Remove(index);
                    }
                });
            });
        }
    }

Calling the extension.

mainInput = Observable.Merge(inputNodes.ToArray()).MergeBuffer<NodeData>(x => x.id, x => MergeData(x), 1);