0
votes

This is follow up question to : Infinite IObservable from Task function and toggle observable

Above question asks if it is possible to create a repeating IObservable<TResult> from IObservable<bool> toggle and Task<TResult> query, so that query is called repeatedly if last toggle was true and not called at all if last toggle is false. That seems to be pretty easily achieved using Defer and Switch methods.

But that has problem, because the query is not parametrized. Concretely, there are two types of parameter to the query function (making the signature Func<int, TParam, IQueryable<TResult>>). First parameter is incremented every time the method is called. Second parameter is latest value from another IObservable<TParam> params.

Again, I want to be able to test this setup automatically.

This is method stump:

public static IObservable<TResult> Function<TParam, TResult>(IObservable<bool> toggle, IObservable<TParam> param, Func<int, TParam, IObservable<TResult>> query)
{
    param.Subscribe(a => { }); // dummy to make debug output

    return toggle
        .Select(b => b
            ? Observable
                .Defer(() => query(0, default(TParam))) // dummy parameters for debugging
                .Repeat()
            : Observable
                .Never<TResult>())
        .Switch();
}

And test that should pass :

[Test]
public void Test_Function()
{
    var scheduler = new TestScheduler();

    var toggle = scheduler.CreateHotObservable(
        OnNext(10, false),
        OnNext(11, true),
        OnNext(18, false),
        OnNext(30, true),
        OnNext(45, false),
        OnNext(100, false)
        ).Do(x => Console.WriteLine(scheduler.Clock + " toggle " + x));

    var prms = scheduler.CreateHotObservable(
        OnNext(10, "a"),
        OnNext(29, "b"),
        OnNext(39, "c")
        ).Do(x => Console.WriteLine(scheduler.Clock + " param " + x));

    var resultObs =
        Function(toggle, prms, (p1, p2) => scheduler.CreateColdObservable(OnNext(2, p1 + " " + p2), OnCompleted<string>(2)))
            .Do(x => Console.WriteLine(scheduler.Clock + " " + x));

    var results = scheduler.Start(() => resultObs, 0, 0, 100);

    results.Messages.AssertEqual(
        //10 toggle False
        //10 param a
        //11 toggle True
        OnNext(13, "0 a"),
        OnNext(15, "1 a"),
        OnNext(17, "2 a"),
        //18 toggle False // should not continue after toggle is off
        //29 param b
        //30 toggle True
        OnNext(32, "0 b"),
        OnNext(34, "1 b"),
        OnNext(36, "2 b"),
        OnNext(38, "3 b"),
        //39 param c
        OnNext(40, "4 b"), // fine if on parameter change, the currently running query finishes
        OnNext(42, "5 c"),
        OnNext(44, "6 c")
        //45 toggle False
        //100 toggle False
        );
}
1
You really need to provide some of the basic code, including my code from the previous answer. You have enough rep to understand that you should be trying to make it as easy as possible for us to answer. - Enigmativity
@Enigmativity I got rep answering question. Not asking them. Also, hard to get rid of DRY habits. - Euphoric
@Enigmativity Tried to fix the question. - Euphoric
Could you try harder? Please put in the full answer in your previous question (modified to fit exactly what you're doing), plus at least the full variable and method signatures of everything else. I'd love to answer this question - I'm sure it wouldn't be hard - but it is too hard to write all of the code that surrounds your description. Please write the surrounding code for me and I can then do the answer. - Enigmativity
@Enigmativity Added method stump with signature and automated test with expected behavior. - Euphoric

1 Answers

0
votes

So I thought about it more and managed to find a working solution.

public class Function2
{
    private readonly IObservable<bool> _toggle;
    private readonly IObservable<string> _param;
    private readonly Func<int, string, IObservable<string>> _query;

    private int _index;
    private string _latestParam;

    public Function2(IObservable<bool> toggle, IObservable<string> param, Func<int, string, IObservable<string>> query)
    {
        _toggle = toggle;
        _param = param;
        _query = query;
    }

    public IObservable<string> Execute()
    {
        // TODO : Dispose the subscriptions
        _toggle.Subscribe(OnToggle);
        _param.Subscribe(OnParam);

        return _toggle
            .Select(b => b
                ? Observable
                    .Defer(InnerQuery)
                    .Repeat()
                : Observable
                    .Never<string>())
            .Switch();
    }

    private void OnToggle(bool tgl)
    {
        // if toggle is on, reset the index
        if(tgl)
        {
            _index = 0;
        }
    }

    private void OnParam(string param)
    {
        _latestParam = param;
    }

    private IObservable<string> InnerQuery()
    {
        var ret = _query(_index, _latestParam);

        _index++;

        return ret;
    }
}

But I don't like it because:

  • Updating the index is done in the query method. This seem like bad case of call having side effects.
  • toggle needs to be subscribed twice
  • I'm not sure if and how to handle disposing of the subscriptions. This will run from start to end of the application, so it might not be an issue, but still.

I like it because it make it easy to fine-tune and modify the behavior of update of parameters based on incoming parameter change and updating index.