1
votes

I'm trying to make a little pipeline with RX.net, but I'm having trouble figuring out how to do it without nesting the code for each process in the pipe, making the code very messy.

Here's basically what I want to do

inputString -> toUpperCase -> reverseString -> printOutput

Here's the code I got working, but if I want to add more stuff to the pipeline it's not going to be pretty.

        var inputObservable = Observable.Return("hello world");

        inputObservable.Subscribe(inputString =>
        {
            var toUpperCaseObservable = Observable.Return(inputString.ToUpper());
            toUpperCaseObservable.Subscribe(toUpperCaseInput =>
            {
                var reverseStringObservable = Observable.Return(new String(toUpperCaseInput.Reverse().ToArray()));
                reverseStringObservable.Subscribe(reverseStringInput =>
                {
                    var writeOutputObservable = Observable.Return(reverseStringInput);
                    writeOutputObservable.Subscribe(input =>
                    {
                        Console.WriteLine(input);
                        Console.ReadLine();
                    });      
                });                    
            });

        });
2
Can you not do all of the transformations in a single subscribe method? Are you intending to do additional work at each point in the pipeline? - Jason Boyd
Your -> symbols in your question are a good pointer to select, or map. You're saying you want to take each item and map it through a function to get a new value. So your chain of "->"s can be implemented with a chain of ".Select(..)"s. Or as Jason said, you may be able to collapse some or all of the transformations into a single Select: .Select(x => x.ToUpper().ReverseString()).Subscribe(x => Console.WriteLine(x)) - Niall Connaughton

2 Answers

2
votes

You really don't want to create an Rx "pipeline" in this way. Nested subscribes like this are not really an Rx pipeline.

Here's how you would do what you did in a basic Rx query/subscribe approach:

    var query =
        from x in Observable.Return("hello world")
        let y = x.ToUpper()
        select new String(y.Reverse().ToArray());

    query.Subscribe(z =>
    {
        Console.WriteLine(z);
        Console.ReadLine();
    });

Much simpler, but it may be that, for illustrative purposes, your operations may be expensive so then this might be a better approach:

    var query =
        from x in Observable.Return("hello world")
        from y in Observable.Start(() => x.ToUpper())
        from z in Observable.Start(() => new String(y.Reverse().ToArray()))
        select z;

    query.Subscribe(z =>
    {
        Console.WriteLine(z);
        Console.ReadLine();
    });

The general rule is that you should not subscribe until the end of your pipeline, at a point when you are ready to perform an action on the result of your pipeline. If you need to perform a computation then that should remain part of the query pipeline.

1
votes

This will also work

var inputObservable = Observable.Return("hello world")
    .Select(s=>s.ToUpper())
    .Select(upperCased=>new String(upperCased.Reverse().ToArray());

inputObservable.Subscribe(str =>
{
    Console.WriteLine(str);
    Console.ReadLine();
});