2
votes

We have a sequence of items which we are processing with Reactive. Part of processing is a configurable pipeline which transforms items (T -> U, eg int -> char in a trivial case). For example, our naive implementation looks like

// simple base class, implements IObserver and IObservable which is
// equivalent to ISubject<T,U>
public abstract class ObservableTask<T, U> : IObserver<T>, IObservable<U>
{
    // NOTE: stateful, blech
    private readonly Subject<U> observable = new Subject<U>();

    public void OnCompleted() { }
    public void OnError(Exception error) { }
    public void OnNext(T value) { observable.OnNext(Process(value)); }
    public IDisposable Subscribe(IObserver<U> observer) 
    {
        return observable.Subscribe(observer);
    }
    public abstract U Process(T value);
}

// trivial implementation of a transform task, transforms an
// input of type int into an output of type char
public class TransformTask : ObservableTask<int, char>
{
    public override char Process(int value)
    {
        Console.WriteLine("Transform '{0}'", value);
        return (char)(value + 64);
    }
}

// trivial report, does not transform but performs IO-bound
// task and passes value to any other subsequent subscribers
public class ReportTask : ObservableTask<char, char>
{
    public override char Process(char value)
    {
        Console.WriteLine("Report '{0}'", value);
        return value;
    }
}

// simple harness that produces desired output/behaviour
public static class ObservableTasks
{
    public static void ChainThings()
    {
        Console.WriteLine("begin observable tasks");

        // NOTE: would use config/reflection to assemble pipe;
        // here we use concrete instances for demonstration only
        TransformTask a = new TransformTask();
        ReportTask b = new ReportTask();

        int[] numbers = new int[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, };
        var s = numbers.ToObservable().Publish();
        a.Subscribe(b);
        s.Subscribe(a);
        s.Connect();
        Console.WriteLine("begin observable tasks");
    }
}

There are a number of benefits to the model above; namely it is intuitive for us to develop units of work and to create a simple framework for assembling any kind of pipeline together.

However, as noted above, our internal Subject<T,U> is a faux-pas. I haven't had much luck using factory methods on Observable.* to emulate preceding behaviour (ie create a single Observable for subscribers to subscribe to and invoke whenever an element arrives).

The only other reference to transforming sequences is later, and refers to Linq usage. In theory we could adapt to something like this

public class TransformTask
{
    public char Select(int value)
    {
        Console.WriteLine("Transform '{0}'", value);
        return (char)(value + 64);
    }
}

public class ReportTask
{
    public char Select(char value)
    {
        Console.WriteLine("Report '{0}'", value);
        return value;
    }
}

public static class SelectTasks
{
    public static void ChainThings()
    {
        Console.WriteLine("begin select tasks");
        TransformTask a = new TransformTask();
        ReportTask b = new ReportTask();
        int[] numbers = new int[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, };

        // in theory we could build this up dynamically 
        // with expression trees
        var s = numbers.
            ToObservable().
            Select(a.Select).
            Select(b.Select).
            Publish();

        // empty subscription?
        s.Subscribe(value => { });
        s.Connect();
        Console.WriteLine("end select tasks");
    }

}

Again, some benefits and costs. Simpler to implement by comparison, but infrastructure would be more difficult (dynamic expression construction for observable).

First, anyone tackle a similar problem and is able to share some insight?

Second, (and I am quite new to Reactive and functional programming) is transform and "act" (ie Bind TransformTask, and Observer ReportTask) absolutely distinct and Select should be used for one and Subscribe(observer) the other?

1

1 Answers

3
votes

I'm not sure you need Subject's at all...

Rather than limit yourself to mapping, you might want to transform the entire observable, so you could use an interface like this...

public interface IStep<T, TResult>
{
    public IObservable<TResult> Transform(IObservable<T> source);
}

Once that exists, you could define a couple of extension methods (just for convenience) to help use that step, like so...

public static class ObservableExtensions
{
    public static IObservable<TResult> Let(this IObservable<T> source, Func<IObservable<T>, IObservable<TResult>> let)
    {
        return let(source);
    }

    public static IObservable<TResult> Let(this IObservable<T> source, IStep<T, TResult> step)
    {
      return source.Let(step.Transform);
    }
}

Then you can define your steps like so...

public class TransformStep : IStep<int, char>
{
    public IObservable<char> Transform(IObservable<int> source)
    {
        return source.Map(IntToChar); 
    }

    public char IntToChar(int value)
    {
        return (char)(value + 64);
    }
}

public class ReportStep : IStep<char, char>
{
    private readonly Logger logger;

    public ReporterStep(Logger logger)
    {
      this.logger = logger;
    }

    public IObservable<char> Transform(IObservable<char> source)
    {
        return source.Do(Report);
    }

    public void Report(char value)
    {
        logger.Log("Report '{0}'", value);
    }
}

And use them in a more or less uniform manner...

Observable.Return<int>(10)
  .Let(new TransformStep())
  .Let(new ReportStep(logger))
  .Subscribe();

This way, all of the logic associated with each step can be internal to the step, and you're left simply to serialize/deserialize, and chain them back together.