3
votes

I have a method that do some work asynchronously with use of observable. I would like to know what is the best way to make this method async, so that I will be able to await on it and do some work after observable completes.

My first try was to use await on observable.

public async Task DoWorkAsync()
{
    var observable = Observable.Create<int>(o =>
    {
        Task.Run(() =>
        {
            Thread.Sleep(1000);
            Console.WriteLine("OnNext");
            o.OnNext(1);
            o.OnError(new Exception("exception in observable logic"));
            //o.OnCompleted();
        });    
        return Disposable.Empty;
    });

    //observable = observable.Publish().RefCount();

    observable.Subscribe(i => Console.WriteLine(i));
    Console.WriteLine("Awaiting...");
    await observable;
    Console.WriteLine("After Awaiting...");
}

Depending on the scenario I had different issues with that approach (+/- means that this part of code is uncommented/commented):

  1. +OnNext +OnCompleted -OnError -RefCount: OnNext was invoked 2 times (observable was subscribed 2 times). This is what I would like to avoid.

  2. +OnNext +OnCompleted -OnError +RefCount: When I use RefCount() method the code works.

  3. -OnNext +OnCompleted -OnError +RefCount: "Sequence contains no element" exception is thrown when my observable doesn't raise OnNext.

  4. +OnNext -OnCompleted +OnError -RefCount: OnNext was invoked 2 times. Exception raised.

  5. +OnNext -OnCompleted +OnError +RefCount: Hangs after displaying 1 (probably because it wants to return to thread that is awaited). We can make it work (and raise exception) by using SubscribeOn(ThreadPoolScheduler.Instance)

Anyway in case when observable is empty (no OnNext rised) we get exception even if OnError is not called and we don't have any exception inside observable logic. Thats why awaiting on observable is not good solution.

That is why I tried another solution using TaskCompletionSource

public async Task DoWorkAsync()
{
    var observable = Observable.Create<int>(o =>
    {
        Task.Run(() =>
        {
            Thread.Sleep(1000);
            Console.WriteLine("OnNext");
            o.OnNext(1);
            o.OnError(new Exception("exception in observable logic"));
            //o.OnCompleted();
        });
        return Disposable.Empty;
    });

    var tcs = new TaskCompletionSource<bool>();

    observable.Subscribe(i => Console.WriteLine(i),
    e =>
    {
        //tcs.TrySetException(e);
        tcs.SetResult(false);
    },
    () => tcs.TrySetResult(true));

    Console.WriteLine("Awaiting...");
    await tcs.Task;
    Console.WriteLine("After Awaiting...");
}

It works ok in all scenarios and in case of OnError is invoked we could either use tcs.SetResult(false) and don't have information about exception details in outside method or we could use tcs.TrySetException(e) and be able to catch the exception in the outside method.

Can you suggest me if there is some better/cleaner solution or my second solution is the way to go?

EDIT

So I would like to know if there is a better solution than my second solution that will:

  • not require to use .Publish().RefCount()
  • not require additional subscription (what happens in await observable under the hood - OnNext is invoked 2 times)
  • Of course I could wrap my solution in some async extension method for subscribing that returns Task
3
If ever you find yourself doing return Disposable.Empty inside Observable.Create then you are doing something wrong.Enigmativity
@Enigmativity This is just an example and I think that this is not relevant to my problem. But anyway, what should I return in case I don't need to dispose anything?astanula
At the very least create a Subject<T> in the Observable.Create<T> and then do { var subject = new Subject<T>(); var subscription = subject.Subscribe(o); /* your code */ return subscription;.Enigmativity
Why should I create an instance of Subject<T> if I don't need this class inside Observable.Create method? If I don't need to dispose anything then I should return Disposable.Empty. I don't understand your point.astanula
When you directly access the observer passed thru to the .Observable.Create you are causing the observable to run synchronously and thus can cause many of the observable operators to not behave as expected. To avoid this the rule should be that returning Disposable.Empty is bad.Enigmativity

3 Answers

4
votes

EDIT:

If you remove the subscription you can do the following:

await observable.Do(i => Console.WriteLine(i)).LastOrDefaultAsync();

As for your arbitrary requirements... Not having multiple subscriptions for a cold observable makes sense; so you publish it. Refusing to use .Publish().Refcount() doesn't make sense. I don't understand why you're rejecting a solution that solves your problem.


There's a lot there, but I'm assuming this is your main question:

Anyway in case when observable is empty (no OnNext rised) we get exception even if OnError is not called and we don't have any exception inside observable logic. Thats why awaiting on observable is not good solution.

await observable is the same as await observable.LastAsync(). So if there is no element, you get an exception. Imagine changing that statement to int result = await observable; What should the value of result be if there's no elements?

If you change await observable; to await observable.LastOrDefaultAsync(); everything should run smoothly.

And yes, you should use .Publish().Refcount()

0
votes

I'd clearly prefer the 2nd solution, because it only subscribes once.

But out of curiosity: what's the purpose of writing a method like this? If it's to allow for configurable side effects, this would be equivalent:

public async Task DoWorkAsync()
{
    Action<int> onNext = Console.WriteLine;

    await Task.Delay(1000);
    onNext(1);
    throw new Exception("exception in DoWork logic"); // ... or don't
}
0
votes

You could use ToTask extension method:

await observable.ToTask();