I have a Observable<Array<Observable<T>>>
which I want to map to Observable<Array<T>>
.
When a new array is emitted, the inner observables should unsubscribe/subscribe as follows:
- If
Observable
exists in previous array and the new/current array, retain pre-existing subscription - If
Observable
did not exist in previous array but does exist in new/current array, create new subscription - If
Observable
existed in previous array but does not exist in new/current array, unsubscribe from pre-existing subscription
I hoped to achieve this using switchMap
on the outer observable and then passing Array<Observable<T>>
into combineLatest
. However, switchMap
will unsubscribe from its previous inner Observable
before subscribing to the new inner Observable
, which means inner subscriptions are not retained as desired.
Example (https://stackblitz.com/edit/typescript-b4wgr1). Given code:
import 'rxjs/Rx';
import { Observable } from 'rxjs';
const debugObservable = <T>(t$: Observable<T>, name: string) =>
new Observable<T>(observer => {
console.log(name, 'subscribe');
const subscription = t$.subscribe(observer);
return () => {
console.log(name, 'unsubscribe');
return subscription.unsubscribe();
};
});
const ofSingle = <T>(t: T) =>
new Observable<T>(observer => {
observer.next(t);
});
const observableOfArrayOfObservablesOfNumber = new Observable<
Array<Observable<number>>
>(observe => {
const keep = debugObservable(ofSingle(1), 'keep');
const remove = debugObservable(ofSingle(2), 'remove');
const add = debugObservable(ofSingle(3), 'add');
observe.next([keep, remove]);
setTimeout(() => {
observe.next([keep, add]);
}, 2000);
return () => {};
});
// The `switchMap` will unsubscribe to the previous inner observable *before* subscribing to the new
// inner observable.
const final$ = observableOfArrayOfObservablesOfNumber.switchMap(
arrayOfObservablesOfNumber => {
const observableOfArrayOfNumbers = Observable.combineLatest(
arrayOfObservablesOfNumber,
);
return debugObservable(
observableOfArrayOfNumbers,
'observableOfArrayOfNumbers',
);
},
);
final$.subscribe(x => console.log('final', x));
This produces:
observableOfArrayOfNumbers subscribe
keep subscribe
remove subscribe
final [1, 2]
keep unsubscribe <--- bad!
remove unsubscribe
observableOfArrayOfNumbers unsubscribe
observableOfArrayOfNumbers subscribe
keep subscribe <--- bad!
add subscribe
final [1, 3]
However, this is what I desire:
observableOfArrayOfNumbers subscribe
keep subscribe
remove subscribe
final [1, 2]
remove unsubscribe
observableOfArrayOfNumbers unsubscribe
observableOfArrayOfNumbers subscribe
add subscribe
final [1, 3]
combineLatest
, but swaps the observables and emitted values like you've described. Are you ok using custom operator or do you insist on combination of existing ones? - m1ch4ls