2
votes

Given an Observable that will start pushing events once Subscribe is called. How can the sequence be converted into an IEnumerable.

In 2010 Jon Skeet wrote about his first impressions of RX and wrote this, which would be equivalent to what I need.

public static List<T> ToList<T>(this IObservable<T> source) 
{ 
    List<T> ret = new List<T>(); 
    source.Subscribe(x => ret.Add(x)); 
    return ret; 
}

https://codeblog.jonskeet.uk/2010/01/16/first-encounters-with-reactive-extensions/

While this solution would work it seems like there should be a better way.

What we're really interested in is the ability to await the observable in a way that behaves similarly to Subscribe.

await observable.ToList()

However, this doesn't seem to work for me.

I've also looked into these two methods

observable.ToEnumerable
observable.Next

A little more information

I plan to use this to write some unit tests.

  1. I'm using a ReplaySubject and performing a sequence of actions with a test substitute.
  2. Calls to the substitute are recorded via OnNext.
  3. I can get the list of calls and Assert they are correct.
2
Jon Skeet's article was written in 2010, when Rx was just a research project and async/await were two years in the future. By 2011 Observable had its own ToList() extension method - Panagiotis Kanavos
@PanagiotisKanavos so you can change IObservable<T> to IObservable<IList<T>> IObservable<IList<TSource>> ToList<TSource>(this IObservable<TSource> source) MSDN And Daniel is asking about IObservable<T> to IEnumerable<T> conversion - lerthe61
@lerthe61 check Shlomo's answer that shows how to do just that. ToEnumerable() offers a blocking alternative. Jon Skeet's article was written back when Rx was a research project and doesn't apply anymore - Panagiotis Kanavos

2 Answers

3
votes

Interestingly Mr Skeet's implementation is pretty woeful. It doesn't block so will only pump values into the list if the source happens to be a replay subject (or similar). So for most Observable Sequences, this would just return an empty list. I am sure Jon knows that, but maybe got lost in translation.

The ToList() is what you want, so I assume that perhaps you are not completing? If you only want a known number of elements, then Take(x).ToList() will suffice instead.

2
votes

EDIT:

If you're unable to complete the stream as described below, then using await will wait interminably. I would test in some other manner. You can look at Nuget Package Microsoft.Reactive.Testing, which contains some powerful testing tools. Or just using Subscribe and asserting in there.


await awaits for the observable to finish or error out. You probably just need to add an OnCompleted call to your ReactiveSubject and you should be fine. The following code works for me:

var rs = new ReplaySubject<int>();

rs.OnNext(1);
rs.OnNext(2);
rs.OnNext(3);
rs.OnCompleted();
var l = await rs.ToList();

//l is equivalent to  new List<int>() {1, 2, 3};