1
votes

In Rxjs, there is the pipe takeUntil but there isn't a pipe wait Until, that makes the current observable waiting for a seconde Observable to emit.

My Final Goal is to make many Observable still waiting until my Observable init$ emits just one value, to continue their execution. So that my Observable init$ has to been executed once and before that my other observable have to wait until inits emits any value different from null.

In this simple exemple, I want to add a pipe to pipedSource doing wait Until init$ , So the source has to wait until init$ emits to emit its value.

import { interval, timer, Subject, combineLatest } from 'rxjs';
import { takeUntil, skipWhile, skipUntil, concatMap, map, take } from 'rxjs/operators';
import { BehaviorSubject } from 'rxjs';

const init$ = new BehaviorSubject(null);

const source = new BehaviorSubject(null);
const pipedSource = source
.pipe(
    skipWhile((res)=> res === null)
    //wait until init$ emits a non null value
)

//first subscription to source
pipedSource.subscribe(val => console.log(val));

source.next({profile:"me"});

//init emits once
setTimeout(()=>{
  init$.next(1);
},2000);

// a second subscription to source
setTimeout(()=>{
  pipedSource.subscribe(val => console.log(val));
},3000);

wanted result:

//after 2s of waiting
//first subscription returns "profile"
//after 3s
//second subscription returns "profile" 
2
So you want to skip until? - Andrew Allen
The first obs, is subscribed to. When the right thing happens in subscrption emit another event. Anyone who subscribed to that event will only be notified after first thing is finished. - JWP

2 Answers

2
votes

You want to run a second observable when a first observable emits a non-null value. To do this, use concatMap or switchMap after skipWhile.

ngOnInit() {
  const init$ = new BehaviorSubject(null);

  const source = new BehaviorSubject({profile:"me"});
  const pipedSource = init$
  .pipe(
      skipWhile((res)=> res === null),
      concatMap(() => source)
  );

  pipedSource.subscribe(val => console.log('first', val));

  //init emits once
  setTimeout(()=>{
    init$.next(1);
  },2000);

  // a second subscription to source
  setTimeout(()=>{
    pipedSource.subscribe(val => console.log('second', val));
  }, 3000);
}

Here I am subscribing to the init$ observable first, waiting for it to emit a non-null value, and then switching to the source observable.

DEMO: https://stackblitz.com/edit/angular-p7kftd

0
votes

If I understand right you question, I see 2 potential cases.

The first one is that your source Observable starts emitting its values independently from wait$. When wait$ emits, only then you start using the values emitted by source. This behavior can be implemented using the combineLatest function like this

//emits value every 500ms
const source$ = interval(500);

combineLatest(source$, wait$)
    .pipe(
        map(([s, v]) => s), // to filter out the value emitted by wait$
        take(5), // just to limit to 5 the values emitted
    )
    .subscribe(val => console.log(val));

setTimeout(() => {
    wait$.next(1);
}, 2000);

In this case what you see printed on the console is a sequence starting from 2 since wait$ emits after 2 seconds.

The second case is when you want source to start emitting its values only after wait$ has emitted. In this case you can use switchMap operator such as here

const wait_2$ = new Subject();

//emits value every 500ms
const source_2$ = interval(500);

wait_2$
    .pipe(
        switchMap(() => source_2$),
        take(5), // just to limit to 5 the values emitted
    )
    .subscribe(val => console.log(val));

setTimeout(() => {
    wait_2$.next(1);
}, 4000);

In this case, after 4 seconds, a sequence starting with 0 gets printed on the console.