I am working with a TypeScript, Angular, NGRX application. I have been writing my state observables without using selectors - the main reason is that I have found that they are less powerful than using RxJS operators directly. As an example, it is not possible to restrict the emission of events using selectors alone - instead a filtering operator must be used.
For the most part, I have had no issues replacing selectors with observables - observables can compose in all of the same ways that selectors can - with one exception: I cannot figure out how to compose observables which may be triggered from the same action. Usually, I have used combineLatest as my goto observable composer; however, in the case when two observables would update on the same action, there is a transient update where one of the observables has a value from the new state and the other has a value from the previous state.
Originally, I considered using the zip observable creator instead; however, while this solves the problem when two observables update together, it does not solve the problem when one observable is updated without the other - as is entirely possible with an NGRX architecture.
I then considered the auditTime(0) operator, which does solve the problem of removing the transient update, but has new problems 1) It causes observables to emit on a later event loop which breaks some assumptions inside of the application (solvable, but annoying) 2) It causes various observables to emit as soon as they can, whereas I would like all observables to emit together, on the same store pulse. Graphically, this means that rendering of different parts of the application are staggered, instead of being drawn together on the same frame (note that our application is very data-heavy, and it is often necessary to drop frames on store pulses)
Finally, I wrote a custom operator to compose observables which are derived from the same source
export type ObservableTuple<TupleT extends any[]> = {
[K in keyof TupleT]: Observable<TupleT[K]>;
};
export function selectFrom<SourceT, TupleT extends any[]>(...inputs: ObservableTuple<TupleT>): OperatorFunction<SourceT, TupleT> {
return (source$: Observable<SourceT>) => source$.pipe(
withLatestFrom(combineLatest<TupleT>(inputs)),
map(([, values]) => values),
);
}
Here is a summary of the problem in TypeScript (using snippets of NGRX, RxJS, and Angular)
interface IState {
foo: string;
bar: string;
}
@Injectable()
class SomeService {
constructor(store$: Store<IState>) {
}
readonly foo$ = this.store$.pipe(select(state => state.foo));
readonly bar$ = this.store$.pipe(select(state => state.bar));
readonly composed$ = this.store$.pipe(
selectFrom(
this.foo$,
this.bar$,
),
map(([foo, bar]) => `${foo} - ${bar}`),
);
}
const UPDATE_FOO = {
type: 'update foo',
foo: 'some updated value for foo'
};
const UPDATE_BAR = {
type: 'update bar',
bar: 'some updated value for bar',
};
const UPDATE_BOTH = {
type: 'update both',
both: 'some updated value for both foo and bar',
};
This works perfectly correctly even when selectFrom calls are nested within one another e.g.
readonly composed2$ = this.store$.pipe(
selectFrom(
this.composed$,
this.foo$
)
)
So long as composed$ is defined before composed2$, everything works out; however, a case I did not consider is when using an operator like switchMap in between composed$ and composed2$. In this case, because compsed2$ is destroyed and recreated by switchMap, it is possible for composed2$ to fire before composed$, which causes everything to get out of sync