4
votes

Suppose that we need to transform a hot Observable in a way that we need to know all of its previously emitted items to be able to determine what to emit next. The solution which I find the most convenient is to pass an instance of a Func1 subclass, which has a global state (e.g. a map or list of previously emitted items) to flatMap. In each call, the Func1 instance would update its state and based on that, decide what to return.

However, I am worried about the "niceness" of this solution. As far as I know, RxJava does not go well with global and mutable state, which this solution seems to be in contrast with. On the other hand, I am sure that my Observable fulfills the Observable contract, so it seems to be at least a working solution, and event if it could be called concurrently, a synchronization would solve the problem.

Other possible solutions could be:

  1. Creating an Operator. Mutable state in Operators is allowed, I guess. Anyways, I try to avoid custom operators, as they are more tricky.

  2. Propagating the history of the Observable through scan (in a List or Map). I would either use the same object (List or Map) for every emitted item, which introduces a mutable object into the stream, or copy the entire object every time, which would waste a lot of performance.

  3. Subscribe to the original Observable, modify some global state from the subscriber, and emit items on a Subject (the transformed Observable) using this global state. I thought about this because it seems to exit the scope of RxJava when it deals with the global state (and synchronization).

So the question is: Should I use the Func1 implementation with mutable state in flatMap for transforming items based on the history of previously emitted items (which works, btw), and if not, what alternatives should I use? In general, I am confused about the recommended way to handle a complex mutable state needed for the transformation of Observables.

I hope I have expressed my problem clearly. Otherwise, let me know and I will try to describe it with the help of some specific problems and code.

2
Use #2. It is not mutable state, if it is only accessible from inside a operator. With #scan you do not need to worry about mutability of the List, because no other thread can access it at the same time. E.g. scan(ArrayList::new, (aList, elementFromUpstream) -> { ... })Hans Wurst
scan() shares mutable state with every subscriber to the observable, often in not very useful ways, including across thread boundaries.Bob Dalgleish
Reusing the same instance in case #2 is potentially a mutable state; you have to consider stages running on different threads on the same subscription chain, even if there is only one end-Subscriber to the Observable. Scan has variants that take an initial state supplier that will be per-Subscriber but the value still has to be effectively immutable.akarnokd

2 Answers

4
votes

Flows with functions containing mutable state are generally not recommended as the mutable state could be potentially shared across multiple Subscribers to a particular Observable chain. Often though, most developers assemble Observables when needed and rarely ever reuse the same Observable. For example, a button click handler will create an Observable that, through composition, forks off two other Observables to get data from two different places asynchronously, and then subscribe to this thread-local Observable instance. A new button click will repeat the process with a fresh and independent Observable.

Here lies the solution to your stateful-function problem: make the existence of the stateful bits depend on the individual Subscribers subscribing: defer()

Observable<Integer> o = Observable.defer(() -> {
    return Observable.range(1, 10)
            .map(new Func1<Integer, Integer>() {
                int sum;
                @Override
                public Integer call(Integer v) {
                    sum += v;
                    return sum;
                }
            });
});

o.subscribe(System.out::println);
o.subscribe(System.out::println);

Since the Func1 inner class will be created for each of the subscribe call, its sum field will be local to each individual consumer. Note also that sum is returned and auto-boxed into an immutable Integer which then can be freely read after in some other thread (think observeOn) as it is then completely detached of the sum field then on.

1
votes

Mutable state and shared, mutable state often are required for useful work. The issue is how well we isolate the mutability from outside parties.

  1. Creating an operator hides the mutability within the operator instance. The downside is that the state is private to the observable chain.
  2. scan(), reduce() and fold() (if it existed) would be good candidates, but they have very limited implementations, export their state in non-obvious ways and are also limited to the observable chain they are attached to.
  3. Subject or Relay objects provide useful cut-out points.

Going back to basics, using a privately accessible data structure in thread-safe ways is not a bad thing. If you are only concerned about the one observer chain, then either of options 1 or 3 will do the job readily.