2
votes

I have two sources of streams that I want to listen to them. The requirements are:

  1. If one emits give me also the last value from the second.
  2. If two of them emits at the same time, don't call the subscribe two times.

The first case is combineLatest, but the second is zip. I need a way to mix combineLatest and zip into one operator.

const { Observable, BehaviorSubject} = Rx;

const movies = {
  ids: [],
  entities: {}
}

const actors = {
  ids: [],
  entities: {}
} 

const storeOne = new BehaviorSubject(movies);
const storeTwo = new BehaviorSubject(actors);

const movies$ = storeOne.map(state => state.entities).distinctUntilChanged();
const actors$ = storeTwo.map(state => state.entities).distinctUntilChanged();

const both$ = Observable.zip(
   movies$,
   actors$,
   (movies, actors) => {
     return {movies, actors};
   }
)

both$.subscribe(console.log);

storeOne.next({
  ...storeOne.getValue(),
  entities: {
    1: {id: 1}
  },
  ids: [1]
});

storeTwo.next({
  ...storeTwo.getValue(),
  entities: {
    1: {id: 1}
  },
  ids: [1]
});

The above code works fine when both emits one after the other, but I need to support also a case where one of them emits. (combineLatest)

How can I do that?

1
Observable.combineLatest(movies$, actors$, (movies, actors) => ({ movies, actors })).auditTime(0)cartant
Can you explain, please?undefined
Don't really have the time to write a decent answer at the moment. If no one else does, I might do so later. Or you could check out the docs.cartant
I like your answers, you are very helpful, I will wait, thanks!undefined

1 Answers

3
votes

Yes, as advised by @cartant you can use Observable.combineLatest(movies$, actors$, (movies, actors) => ({ movies, actors })).auditTime(0)

To elaborate the above,

  1. auditTime(n) will wait for n milliseconds and emit the latest value.
  2. auditTime(0) is similar to setTimeout(0), it actually waits for nothing (executes immediately), but waits for the current event/execution loop to complete.

Example

Here values B & 2 are emitted together, so when you use combineLatest you would get either B1,B2 or A2, B2 (which is based on the internal clock). Regardless B2 is the latest value in that execution loop. Since we are waiting for 0 milliseconds i.e. for the current execution loop to get the latest value via auditTime(0), the observable would emit only B2.