I'm having trouble with the way ReactiveCommand deals with ObserveOn and SubscribeOn.
I have an API that returns an observable sequence of strings, and it looks like this :
public IObservable<string> GetDocumentObservable(int numParagraphs, int latency)
{
return Observable.Create<string>(obs =>
{
for (int i = 0; i < numParagraphs; i++)
{
Console.WriteLine("Service On thread {0}", Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(1000);
obs.OnNext("Some String");
}
obs.OnCompleted();
return Disposable.Empty;
});
}
I'm using ReactiveCommand.CreateAsyncObservable to invoke this, using SubscribeOn(RxApp.TaskpoolScheduler) (to ensure the Thread.Sleep doesn't happen on the UI thread), and ObserveOn(RxApp.MainThreadScheduler) to draw strings on my UI thread.
Unfortunately, this all executed synchronously (on the same thread), and I'm not sure why. This is was the VM code looks like :
DownloadDocument = ReactiveCommand
.CreateAsyncObservable(_ =>
{
Console.WriteLine("ViewModel Invoking On thread {0}", Thread.CurrentThread.ManagedThreadId);
return _documentService.GetDocumentObservable(NumParagraphs, 0);
});
DownloadDocument
.SubscribeOn(RxApp.TaskpoolScheduler)
.ObserveOn(RxApp.MainThreadScheduler)
.Subscribe(p =>
{
Console.WriteLine("ViewModel OnNext thread {0}", Thread.CurrentThread.ManagedThreadId);
Document.Add(p);
},
x => { },
() => { Console.WriteLine("ViewModel OnComplete thread {0}", Thread.CurrentThread.ManagedThreadId); });
Eveything executes on the same thread, and blocks the UI thread. If I invoke it "the old fashioned way", everything works as expected (as below):
Something = ReactiveCommand.Create();
Something.Subscribe(x =>
{
_documentService.GetDocumentObservable(NumParagraphs, 0)
.SubscribeOn(RxApp.TaskpoolScheduler)
.ObserveOn(RxApp.MainThreadScheduler)
.Subscribe(p =>
{
Console.WriteLine("ViewModel OnNext thread {0}", Thread.CurrentThread.ManagedThreadId);
Document.Add(p);
},
ex => { },
() => { Console.WriteLine("ViewModel OnComplete thread {0}", Thread.CurrentThread.ManagedThreadId); });
});
No blocked threads here.
Is there something I'm missing when it comes to using ReactiveCommands with Observable apis ?