1
votes

I have a Observable<Array<Observable<T>>> which I want to map to Observable<Array<T>>.

When a new array is emitted, the inner observables should unsubscribe/subscribe as follows:

  • If Observable exists in previous array and the new/current array, retain pre-existing subscription
  • If Observable did not exist in previous array but does exist in new/current array, create new subscription
  • If Observable existed in previous array but does not exist in new/current array, unsubscribe from pre-existing subscription

I hoped to achieve this using switchMap on the outer observable and then passing Array<Observable<T>> into combineLatest. However, switchMap will unsubscribe from its previous inner Observable before subscribing to the new inner Observable, which means inner subscriptions are not retained as desired.

Example (https://stackblitz.com/edit/typescript-b4wgr1). Given code:

import 'rxjs/Rx';
import { Observable } from 'rxjs';

const debugObservable = <T>(t$: Observable<T>, name: string) =>
    new Observable<T>(observer => {
        console.log(name, 'subscribe');
        const subscription = t$.subscribe(observer);
        return () => {
            console.log(name, 'unsubscribe');
            return subscription.unsubscribe();
        };
    });

const ofSingle = <T>(t: T) =>
    new Observable<T>(observer => {
        observer.next(t);
    });

const observableOfArrayOfObservablesOfNumber = new Observable<
    Array<Observable<number>>
>(observe => {
    const keep = debugObservable(ofSingle(1), 'keep');
    const remove = debugObservable(ofSingle(2), 'remove');
    const add = debugObservable(ofSingle(3), 'add');

    observe.next([keep, remove]);

    setTimeout(() => {
        observe.next([keep, add]);
    }, 2000);

    return () => {};
});

// The `switchMap` will unsubscribe to the previous inner observable *before* subscribing to the new
// inner observable.
const final$ = observableOfArrayOfObservablesOfNumber.switchMap(
    arrayOfObservablesOfNumber => {
        const observableOfArrayOfNumbers = Observable.combineLatest(
            arrayOfObservablesOfNumber,
        );
        return debugObservable(
            observableOfArrayOfNumbers,
            'observableOfArrayOfNumbers',
        );
    },
);

final$.subscribe(x => console.log('final', x));

This produces:

observableOfArrayOfNumbers subscribe
keep subscribe
remove subscribe
final [1, 2]
keep unsubscribe <--- bad!
remove unsubscribe
observableOfArrayOfNumbers unsubscribe
observableOfArrayOfNumbers subscribe
keep subscribe <--- bad!
add subscribe
final [1, 3]

However, this is what I desire:

observableOfArrayOfNumbers subscribe
keep subscribe
remove subscribe
final [1, 2]
remove unsubscribe
observableOfArrayOfNumbers unsubscribe
observableOfArrayOfNumbers subscribe
add subscribe
final [1, 3]
3
You could do this with custom operator - something that works like combineLatest, but swaps the observables and emitted values like you've described. Are you ok using custom operator or do you insist on combination of existing ones? - m1ch4ls
Custom is fine! - Oliver Joseph Ash

3 Answers

0
votes

I ended up achieving this by publishing + replaying the inner observables with publishReplay(1) and then ref counting.

Note that refCount is not sufficient because the count will drop to 0 when switchMap unsubscribes from the previous inner observable (before it subscribes to the new inner observable), so I had to use a special refCountWithDelay operator which only unsubscribes by ref counting after a delay (i.e. within the same tick of the event loop but not synchronously). More on that here:

https://stackblitz.com/edit/typescript-4xfwsh?file=index.ts

const createObservable = <T>(t: T, name: string) => {
  return refCountWithDelay(debugObservable(ofSingle(t), name).publishReplay(1), 0, 0);
}

const observableOfArrayOfObservablesOfNumber = new Observable<
    Array<Observable<number>>
>(observe => {
    const keep = createObservable(1, 'keep');
    const remove = createObservable(2, 'remove');
    const add = createObservable(3, 'add');

    observe.next([keep, remove]);

    setTimeout(() => {
        observe.next([keep, add]);
    }, 2000);

    return () => {};
});

Produces:

observableOfArrayOfNumbers subscribe
keep subscribe
remove subscribe
final [1, 2]
observableOfArrayOfNumbers unsubscribe
observableOfArrayOfNumbers subscribe
remove unsubscribe
add subscribe
final [1, 3]

Notice that keep is only subscribed to once.

0
votes
-1
votes

The closest to what you described is an xstream operator called pickCombine in Cycle.js Onionify.

There doesn't seem to be one single official RxJS operator that solves this, but it is possible to build your own operator that implements this behavior. You can use the xstream implementation of pickCombine as a reference.

Key parts are:

Notice that it is easier and more efficient to create a custom data structure (which uses a Map and relies on keys to disambiguate array items) than to do it directly on an array. You can hide the custom data structure from the external API.