5
votes

I'm filtering an Observable with the input of another Observable - the input for the filtering comes from the user.

The filtering is done with the RxJS operator combineLatest. Using this means that when subscribing to this stream, no values are emitted until there has been an emission from both source Observables - I'd like the created stream to emit on creation (without any filtering), before any user input takes place.

I think I should be using the startWith operator so the stream has an emission on creation, but I can't work out how to seed this from an Observable. Using an Observable because data comes from Firebase and is handled with FirebaseListObservable's.

Below is a pieced-together version of what I'm doing currently.

let tagInput = document.getElementById('tags');
let tagExclusionStream = Observable
  .fromEvent(tagInput, 'input')
  .map((e: any) => createsArrayFromInput(e.target.value));

let allTags: Observable<any[]> = getAllTags();

let filteredTags = allTags
  .combineLatest(tagExclusionStream, (tags, tagExclusions) => {
     return tags.filter((tag: any) => tagExclusions.indexOf(tag.$key) == -1)
  });

// I want this to print out without needing the tagExclusionStream to emit first 
filteredTags.subscribe(tags => console.log("Tags:", tags))

Please let me know if my approach here is completely off/there's a better way as I'm new to RxJS.

2

2 Answers

4
votes

I think this will do the trick:

let filteredTags = allTags
  .combineLatest(tagExclusionStream.startWith(''), (tags, tagExclusions) => {
     return tags.filter((tag: any) => tagExclusions.indexOf(tag.$key) == -1)
  });

Alternatively, if you use tagExclusionStream in different places you can do this:

let tagExclusionStream = Observable
  .fromEvent(tagInput, 'input')
  .map((e: any) => createsArrayFromInput(e.target.value))
  .startWith('');
9
votes

startWith is just using concat internally. So do

const startValue$ = of('start with this')

concat(startValue$, source$)

Or write your own operator

function startFrom<T, O extends ObservableInput<any>>(start: O): OperatorFunction<T, T | ObservedValueOf<O>> {
  return (source: Observable<T>) => concat(start, source)
}

source$.pipe(
  startFrom(startValue$)
)

Make sure startValue$ completes

The new Observable will switch to source$ once startValue$ completes. So if the Observable to start with is a long-lived Observable e.g. a ReplaySubject where you only care about the most recent value you have to make sure the Observable you pass in completes, e.g. with take(1).

const { ReplaySubject, concat, of } = rxjs;
const { take } = rxjs.operators;

const source$ = of('source-1', 'source-2')
const replay = new ReplaySubject(1)
replay.next('subject-1')
replay.next('subject-2')

const startWith$ = replay.pipe(take(1)) // <-- take one item and complete
const o$ = concat(startWith$, source$)
o$.subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js" integrity="sha256-85uCh8dPb35sH3WK435rtYUKALbcvEQFC65eg+raeuc=" crossorigin="anonymous"></script>