1
votes

I am working with a TypeScript, Angular, NGRX application. I have been writing my state observables without using selectors - the main reason is that I have found that they are less powerful than using RxJS operators directly. As an example, it is not possible to restrict the emission of events using selectors alone - instead a filtering operator must be used.

For the most part, I have had no issues replacing selectors with observables - observables can compose in all of the same ways that selectors can - with one exception: I cannot figure out how to compose observables which may be triggered from the same action. Usually, I have used combineLatest as my goto observable composer; however, in the case when two observables would update on the same action, there is a transient update where one of the observables has a value from the new state and the other has a value from the previous state.

Originally, I considered using the zip observable creator instead; however, while this solves the problem when two observables update together, it does not solve the problem when one observable is updated without the other - as is entirely possible with an NGRX architecture.

I then considered the auditTime(0) operator, which does solve the problem of removing the transient update, but has new problems 1) It causes observables to emit on a later event loop which breaks some assumptions inside of the application (solvable, but annoying) 2) It causes various observables to emit as soon as they can, whereas I would like all observables to emit together, on the same store pulse. Graphically, this means that rendering of different parts of the application are staggered, instead of being drawn together on the same frame (note that our application is very data-heavy, and it is often necessary to drop frames on store pulses)

Finally, I wrote a custom operator to compose observables which are derived from the same source

export type ObservableTuple<TupleT extends any[]> = {
  [K in keyof TupleT]: Observable<TupleT[K]>;
};

export function selectFrom<SourceT, TupleT extends any[]>(...inputs: ObservableTuple<TupleT>): OperatorFunction<SourceT, TupleT> {
  return (source$: Observable<SourceT>) => source$.pipe(
    withLatestFrom(combineLatest<TupleT>(inputs)),
    map(([, values]) => values),
  );
}

Here is a summary of the problem in TypeScript (using snippets of NGRX, RxJS, and Angular)

interface IState {
    foo: string;
    bar: string;
}

@Injectable()
class SomeService {
    constructor(store$: Store<IState>) {
    }

    readonly foo$ = this.store$.pipe(select(state => state.foo));
    readonly bar$ = this.store$.pipe(select(state => state.bar));

    readonly composed$ = this.store$.pipe(
        selectFrom(
            this.foo$,
            this.bar$,
        ),
        map(([foo, bar]) => `${foo} - ${bar}`),
    );
}

const UPDATE_FOO = {
    type: 'update foo',
    foo: 'some updated value for foo'
};
const UPDATE_BAR = {
    type: 'update bar',
    bar: 'some updated value for bar',
};
const UPDATE_BOTH = {
    type: 'update both',
    both: 'some updated value for both foo and bar',
};

This works perfectly correctly even when selectFrom calls are nested within one another e.g.

readonly composed2$ = this.store$.pipe(
   selectFrom(
      this.composed$,
      this.foo$
   )
)

So long as composed$ is defined before composed2$, everything works out; however, a case I did not consider is when using an operator like switchMap in between composed$ and composed2$. In this case, because compsed2$ is destroyed and recreated by switchMap, it is possible for composed2$ to fire before composed$, which causes everything to get out of sync

1

1 Answers

2
votes

For your specific problem of trying to compose 2 observables and only emit after both of them have finished emitting, you can try to take advantage of:

  • queue Scheduler - lets you defer recursive calls until the current call completes
  • debounce - delay an update until a signal arrives
  • observeOn - only listen to store updates on the queue Scheduler

Then you could do something like the following:

readonly queuedStore$ = this.store$.pipe(
    observeOn(queue), // use queue scheduler to listen to store updates
    share() // use the same store event to update all of our selectors
  );

// use queuedStore$ here
readonly foo$ = this.queuedStore$.pipe(select(state => state.foo));
readonly bar$ = this.queuedStore$.pipe(select(state => state.bar));

// when composing, debounce the combineLatest() with an observable
// that completes immediately, but completes on the queue scheduler
readonly composed$ = combineLatest(foo$, bar$).pipe(
  debounce(() => empty().pipe(observeOn(queue))));

What will happen?

Foo Update

  1. queuedStore$ schedules notification on queue
  2. notification starts immediately since nothing is currently running
  3. foo$ notifies
  4. combineLatest notifies
  5. debounce subscribes to durationSelector
  6. durationSelector schedules notification on queue
  7. notification is not sent, since queued action is currently running
  8. call stack unwinds to step 1
  9. queue scheduler runs durationSelector notification
  10. debounce triggers and sends out update to UI

Bar Update

Works same as Foo update

BarFoo Update

  1. queuedStore$ schedules notification on queue
  2. notification starts immediately since nothing is currently running
  3. foo$ notifies
  4. combineLatest notifies
  5. debounce subscribes to durationSelector
  6. durationSelector schedules notification on queue
  7. notification is not sent, since queued action is currently running
  8. call stack unwinds to step 3
  9. bar$ notifies
  10. combineLatest notifies
  11. debounce throws away previous value from foo notification
  12. debounce resubscribes to durationSelector
  13. durationSelector schedules notification on queue
  14. notification is not sent, since queued action is currently running
  15. call stack unwinds to step 1
  16. queue scheduler runs durationSelector notification
  17. debounce triggers and sends out update to UI

In theory this gets you your desired behavior: - Single updates apply immediately (before next tick) - Combined update applies immediately (before next tick) - Combined updates ignore intermediate result - Should still work if your composed observable uses switch.

Thing to watch out for

If you dispatch another event while handling one of these notifications on the queue scheduler, the notifications for that 2nd event will be deferred until after the current handler completes.