2
votes

I have an observable which streams a value for each ms. , this is done every 250 ms. ( meaning 250 values in 250 ms (give or take) ).

Mock sample code :

     IObservable<IEnumerable<int>> input = from _ in Observable.Interval(TimeSpan.FromMilliseconds(250))
                    select CreateSamples(250);

      input.Subscribe(values =>
        {
            for (int i = 0; i < values.Count(); i++)
            {
                Console.WriteLine("Value : {0}", i);
            }
        });

        Console.ReadKey(); 


    private static IEnumerable<int> CreateSamples(int count)
    {
        for (int i = 0; i < 250; i++)
        {
            yield return i;
        }
    }

What i need is to create some form of process observable which process the input observable in a rate of 8 values every 33 ms

Something along the line of this :

 IObservable<IEnumerable<int>> process = from _ in Observable.Interval(TimeSpan.FromMilliseconds(33)) 
                     select stream.Take(8);

I was wondering 2 things :

1) How can i write the first sample with the built in operators that reactive extensions provides ?

2) How can i create that process stream which takes values from the input stream which with the behavior iv'e described ?

I tried using Window as a suggestion from comments below .

 input.Window(TimeSpan.FromMilliseconds(33)).Take(8).Subscribe(winObservable => Debug.WriteLine(" !! "));

It seems as though i get 8 and only 8 observables of an unknown number of values

What i require is a recurrence of 8 values every 33 ms. from input observable.

What the code above did is 8 observables of IEnumrable and then stand idle.

EDIT : Thanks to James World . here's a sample .

  var input = Observable.Range(1, int.MaxValue);

  var timedInput = Observable.Interval(TimeSpan.FromMilliseconds(33))
        .Zip(input.Buffer(8), (_, buffer) => buffer);

  timedInput.SelectMany(x => x).Subscribe(Console.WriteLine);

But now it get's trickier i need for the Buffer value to calculated i need this to be done by the actual MS passed between Intervals
when you write a TimeSpan.FromMilliseconds(33) the Interval event of the timer would actually be raised around 45 ms give or take .

Is there any way to calculate the buffer , something like PSUDO

  input.TimeInterval().Buffer( s => s.Interval.Milliseconds / 4)
2
It's just a Window over 33ms followed by .Take(8) - zerkms
this gives me an IObservable<IObservable<IEnumrable<int>>> and not the IObservable<IEnumrable<int>> i expected . how would you extract the values for the observer ? - eran otzap
@zerkms i'll try it out tommrow , i'll let you know . did you see the edit to my question ? it does seem as though it happens 8 times , but i don't wan't 8 Observables . i wan't 8 values from input observable every 33 ms . what i got is 8 and only 8 obsrevable of god knows how many values and then stood idle.. - eran otzap
1. Window function chunks the input stream by 33ms 2. Take(8) takes 8 elements out of every chunk. You just need to combine these 2 operators properly. For example - .Window(TimeSpan.FromMilliseconds(33)).SelectMany(i => i.Take(8)) PS: I don't develop in C# and don't use Rx.NET so this may be not exactly correct, but just follow the idea. - zerkms

2 Answers

3
votes

You won't be able to do this with any kind of accuracy with a reasonable solution because .NET timer resolution is 15ms.

If the timer was fast enough, you would have to flatten and repackage the stream with a pacer, something like:

// flatten stream
var fs = input.SelectMany(x => x);

// buffer 8 values and release every 33 milliseconds
var xs = Observable.Interval(TimeSpan.FromMilliseconds(33))
                   .Zip(fs.Buffer(8), (_,buffer) => buffer);

Although as I said, this will give very jittery timing. If that kind of timing resolution is important to you, go native!

1
votes

I agree with James' analysis.

I'm wondering if this query gives you a better result:

IObservable<IList<int>> input =
    Observable
        .Generate(
            0,
            x => true,
            x => x < 250 ? x + 1 : 0,
            x => x,
            x => TimeSpan.FromMilliseconds(33.0 / 8.0))
        .Buffer(TimeSpan.FromMilliseconds(33.0));