Suppose that we need to transform a hot Observable in a way that we need to know all of its previously emitted items to be able to determine what to emit next. The solution which I find the most convenient is to pass an instance of a Func1 subclass, which has a global state (e.g. a map or list of previously emitted items) to flatMap. In each call, the Func1 instance would update its state and based on that, decide what to return.
However, I am worried about the "niceness" of this solution. As far as I know, RxJava does not go well with global and mutable state, which this solution seems to be in contrast with. On the other hand, I am sure that my Observable fulfills the Observable contract, so it seems to be at least a working solution, and event if it could be called concurrently, a synchronization would solve the problem.
Other possible solutions could be:
Creating an Operator. Mutable state in Operators is allowed, I guess. Anyways, I try to avoid custom operators, as they are more tricky.
Propagating the history of the Observable through scan (in a List or Map). I would either use the same object (List or Map) for every emitted item, which introduces a mutable object into the stream, or copy the entire object every time, which would waste a lot of performance.
Subscribe to the original Observable, modify some global state from the subscriber, and emit items on a Subject (the transformed Observable) using this global state. I thought about this because it seems to exit the scope of RxJava when it deals with the global state (and synchronization).
So the question is: Should I use the Func1 implementation with mutable state in flatMap for transforming items based on the history of previously emitted items (which works, btw), and if not, what alternatives should I use? In general, I am confused about the recommended way to handle a complex mutable state needed for the transformation of Observables.
I hope I have expressed my problem clearly. Otherwise, let me know and I will try to describe it with the help of some specific problems and code.
scan()
shares mutable state with every subscriber to the observable, often in not very useful ways, including across thread boundaries. – Bob Dalgleish