0
votes

We have a situation where we want to start a background "polling" operation in a C# application that returns values periodically using reactive extensions. The process we'd like to implement is the following:

  1. A caller calls a method like Poll() that returns an IObservable
  2. The caller subscribes to said observable, and that starts a background thread/task that interacts with hardware to retrieve values on some interval
  3. When the caller is done it disposes of the subscription and that automatically stops the background thread/task

Attempt #1

To try to prove this out I wrote the following console app, but this isn't acting the way I was expecting:

public class OutputParameters
{
    public Guid Id { get; set; }
    public int Value { get; set; }
}

public class Program
{
    static void Main(string[] args)
    {
        Console.WriteLine("Requesting the polling operation");
        var worker1 = Poll();

        Console.WriteLine("Subscribing to start the polling operation");

        var sub1 = worker1.Subscribe(
            value => { Console.WriteLine($"Thread {value.Id} emitted {value.Value}"); },
            ex => { Console.WriteLine($"Thread threw an exception: {ex.Message}"); },
            () => { Console.WriteLine("Thread has completed"); });


        Thread.Sleep(5000);

        sub1.Dispose();

        Console.ReadLine();
    }


    private static IObservable<OutputParameters> Poll()
    {
        return Observable.DeferAsync(Worker);
    }


    private static Task<IObservable<OutputParameters>> Worker(CancellationToken token)
    {
        var subject = new Subject<OutputParameters>();

        Task.Run(async () =>
        {
            var id = Guid.NewGuid();
            const int steps = 10;

            try
            {
                for (var i = 1; i <= steps || token.IsCancellationRequested; i++)
                {
                    Console.WriteLine($"[IN THREAD] Thread {id}: Step {i} of {steps}");
                    subject.OnNext(new OutputParameters { Id = id, Value = i });

                    // This will actually throw an exception if it's the active call when
                    //  the token is cancelled.
                    //
                    await Task.Delay(1000, token);
                }
            }
            catch (TaskCanceledException ex)
            {
                // Interestingly, if this is triggered because the caller unsibscribed then
                //  this is unneeded...the caller isn't listening for this error anymore
                //
                subject.OnError(ex);
            }

            if (token.IsCancellationRequested)
            {
                Console.WriteLine($"[IN THREAD] Thread {id} was cancelled");
            }
            else
            {
                Console.WriteLine($"[IN THREAD] Thread {id} exiting normally");
                subject.OnCompleted();
            }
        }, token);

        return Task.FromResult(subject.AsObservable());
    }
}

The code above actually seems to cancel the background task almost immediately, as this is the output:

Requesting the polling operation
Subscribing to start the polling operation
[IN THREAD] Thread a470e6f4-2e62-4a3c-abe6-670bce39b6de: Step 1 of 10
Thread threw an exception: A task was canceled.
[IN THREAD] Thread a470e6f4-2e62-4a3c-abe6-670bce39b6de was cancelled

Attempt #2

I then tried making a small change to the Worker method to make it async and await the Task.Run call like so:

    private static async Task<IObservable<OutputParameters>> Worker(CancellationToken token)
    {
        var subject = new Subject<OutputParameters>();

        await Task.Run(async () =>
        {
            ...what happens in here is unchanged...
        }, token);

        return subject.AsObservable();
    }

The result here though makes it seem like the background task has complete control because it does run for about 5 seconds before being cancelled, but there's no output from the subscribe callbacks. Here's the complete output:

Requesting the polling operation
Subscribing to start the polling operation
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 1 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 2 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 3 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 4 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 5 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 6 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a was cancelled

My question

So it's clear that I don't fully understand what's happening here, or that using DeferAsync is the right creation method for an observable in this case.

Is there a proper way to implement this type of approach?

1
Subject<OutputParameters> has to start the async work of polling as soon as someone subscribes to it. Look into how to keep track of subscribers in c#. You can either use events or use the observer pattern, I would just use events. Then once all subscribers have unsubscribe, send a cancellation token to stop the polling. Most of this code should be in Subject<OutputParameters> so it is encapsulated and not within the Program class. - CodingYoshi
Also, first try it with a windows form application and once you have it working, search for Task<T> in console apps and study that because it is a bit trickier with console. - CodingYoshi
Out of curiosity, was there a reason for the down vote? - Sam Storie
If you are asking me, I was wondering this myself. I think your question is well worded and pretty clear. - CodingYoshi
@SamStorie - Don't use a Subject like you have in your question. Either wrap it in an Observable.Defer or try to avoid it altogether. Having it sit as a variable in the method causes it to be captured in your observable creating a run-once observable - and any error or completion signal sent will kill your observable for all time. - Enigmativity

1 Answers

2
votes

This will do it, if an RX-only solution will suffice. Much cleaner if you asked me...

static IObservable<OutputParameters> Poll()
{
    const int steps = 10;
    return Observable.Defer<Guid>(() => Observable.Return(Guid.NewGuid()))
        .SelectMany(id => 
            Observable.Generate(1, i => i <= steps, i => i + 1, i => i, _ => TimeSpan.FromMilliseconds(1000))
                .ObserveOn(new EventLoopScheduler())
                .Do(i => Console.WriteLine($"[IN THREAD] Thread {id}: Step {i} of {steps}"))
                .Select(i => new OutputParameters { Id = id, Value = i })
        );
}

Explanation:

  • Generate is like a for-loop for Rx. The last argument controls when items are emitted. This is equivalent to your for loop + Task.Delay.
  • ObserveOn controls where/when the observable is observed. In this case, the EventLoopScheduler will spin up one new thread per subscriber, and all items from that observable will be observed on the new thread.

From Enigmativity:

static IObservable<OutputParameters> Poll()
{
    const int steps = 10;
    return Observable.Defer<OutputParameters>(() =>
    {
        var id = Guid.NewGuid();
        return Observable.Generate(1, i => i <= steps, i => i + 1, i => i,
                _ => TimeSpan.FromMilliseconds(1000), new EventLoopScheduler())
            .Do(i => Console.WriteLine($"[IN THREAD] Thread {id}: Step {i} of {steps}"))
            .Select(i => new OutputParameters { Id = id, Value = i });
    });
}