1
votes

I have a list of hosts that I want to change during the lifetime of the app. I want the response from each host as soon as it is available. If the host list changes, I don't care about the previous responses from http.get. I want to accumulate all the hosts' responses in a single array and subscribe to it.

The issue I'm having right now is that when the host list changes, the inner observable returned from http.get is not unsubscribed from. I want that host request to be cancelled if that host takes a long time to return and the outermost hosts$ subject has a next value.

A new host list is created with hosts$.next([...])


  const accumulator = (acc, val) => [...acc, val]

  hosts$ = new BehaviorSubject(['host1', 'slow2'])

  doer = <T>(): Observable<T> =>
    this.hosts$.pipe( // take an array of hosts
      switchMap(hosts => fromArray(hosts).pipe( // create an array of observable events for each
        mergeMap(host => this.http.get(`http://${host}`).pipe( // hit all the urls at the same time
          catchError(() => []) // on 404 or 500 just give back an empty array
        )),
        startWith([]), // subscribers will see an empty list until the first response
        scan(accumulator), // every response will be accumulated
        shareReplay({
          bufferSize: 1, // allow new subscriptions to get the current value
          refCount: true // but when the last subscription is removed, start over with []
        })
      )) // switch map unsubscribes from the previous host list if the host list changes
    )

  hosts$.subscribe() // here we have the array of responses seen so far as soon
                     // as they are available

  // assuming slow2 is a host that takes forever, and we change the host list...

  hosts$.next(['host3']) // here, the get request to slow2 is not cancelled

  // the output of the observable in doer() is correct,
  //   but the get request should have been cancelled

I tried to edit the code above to use a switchMap instead of a mergeMap to allow me to cancel the previous http.get request, but now the only values that come through are the last hosts's get.

As stated in comments, we could add a new observable that represents the "current version of the host list" and use it in the http.get's pipe, and takeUntil the host list changes. Now, every time the host list updates, we send an event on that new observable. This makes me think of a good name for the pipe operator I'm looking for: obserbale.pipe(takeUntilNext(subject))

1
I am assuming there will be a way to assign new hosts. You can create an another Subject and use takeUntil to unsubscribe. You can emit next item on takeUntil just before assigning new hosts valueJasdeep Singh
One thing, How are you planning to update hosts hosts$ = new BehaviorSubject(['host1']) ??Jasdeep Singh
why are you converting your host array to a stream at all? why not just use forkJoin to get them all at once?bryan60
there's no need for the operator, swithMap already does what you want, you just need to describe your use case a little more clearlybryan60
can you post what the accumulator is doingbryan60

1 Answers

2
votes

seems like this could just be cleaned up like so:

use this utility observable:

/*
  takes an array of observables and merges them and accumulates emissions in an array as they arrive
*/
const mergeJoin = <T>(obs$: Array<Observable<T>>): Observable<Array<T>> => {
 return merge(...obs$).pipe(
   scan((acc, val) => [...acc, val], [])
 )
}

and do:

  private getHost(host) {
    return this.http.get(`http://${host}`).pipe(
      catchError(() => [])  // is the response an array? is that what you want for your error case?
    )
  }

  doer = <T>(): Observable<T> =>
    this.hosts$.pipe(
      switchMap(hosts => mergeJoin(hosts.map(host => this.getHost(host)))),
      startWith([]),
      shareReplay({bufferSize: 1, refCount: true})
    )

now you have doer written as a function, which is a strange decision for an observable with shareReplay attached to it, as it will create a new stream no matter what everytime the function is called, and retriggering the host fetch everytime it's subscribed to, rendering shareReplay not as useful. I assume you have your reasons, but if this isn't your goal, just make it a static observable instead of a function.