I have a list of hosts that I want to change during the lifetime of the app. I want the response from each host as soon as it is available. If the host list changes, I don't care about the previous responses from http.get. I want to accumulate all the hosts' responses in a single array and subscribe to it.
The issue I'm having right now is that when the host list changes, the inner observable returned from http.get is not unsubscribed from. I want that host request to be cancelled if that host takes a long time to return and the outermost hosts$ subject has a next value.
A new host list is created with hosts$.next([...])
const accumulator = (acc, val) => [...acc, val]
hosts$ = new BehaviorSubject(['host1', 'slow2'])
doer = <T>(): Observable<T> =>
this.hosts$.pipe( // take an array of hosts
switchMap(hosts => fromArray(hosts).pipe( // create an array of observable events for each
mergeMap(host => this.http.get(`http://${host}`).pipe( // hit all the urls at the same time
catchError(() => []) // on 404 or 500 just give back an empty array
)),
startWith([]), // subscribers will see an empty list until the first response
scan(accumulator), // every response will be accumulated
shareReplay({
bufferSize: 1, // allow new subscriptions to get the current value
refCount: true // but when the last subscription is removed, start over with []
})
)) // switch map unsubscribes from the previous host list if the host list changes
)
hosts$.subscribe() // here we have the array of responses seen so far as soon
// as they are available
// assuming slow2 is a host that takes forever, and we change the host list...
hosts$.next(['host3']) // here, the get request to slow2 is not cancelled
// the output of the observable in doer() is correct,
// but the get request should have been cancelled
I tried to edit the code above to use a switchMap instead of a mergeMap to allow me to cancel the previous http.get request, but now the only values that come through are the last hosts's get.
As stated in comments, we could add a new observable that represents the "current version of the host list" and use it in the http.get's pipe, and takeUntil the host list changes. Now, every time the host list updates, we send an event on that new observable. This makes me think of a good name for the pipe operator I'm looking for: obserbale.pipe(takeUntilNext(subject))