You can create a Subject that will be used by all other observables to tell about their current state. You can then use this Subject to reduce the state to the number of currently active Observables.
See this jsbin example.
const meta$ = new Rx.Subject();
// Create some streams that we can observe.
const stream1$ = Rx.Observable.interval(300);
const stream2$ = Rx.Observable.timer(1000)
.switchMap(() => Rx.Observable.interval(500).startWith(0))
.take(10);
const stream3$ = Rx.Observable.timer(1500)
.switchMap(() => Rx.Observable.interval(500).startWith(0))
.take(2);
stream1$.subscribe(
next => meta$.next({ stream1: true }),
() => meta$.next({ stream1: false }),
() => meta$.next({ stream1: false })
);
stream2$.subscribe(
next => meta$.next({ stream2: true }),
() => meta$.next({ stream2: false }),
() => meta$.next({ stream2: false })
);
stream3$.subscribe(
next => meta$.next({ stream3: true }),
() => meta$.next({ stream3: false }),
() => meta$.next({ stream3: false })
);
meta$
.scan((state, next) => {
return Object.assign({}, state, next);
})
.map(obj => {
return Object.keys(obj)
.map(key => +obj[key])
.reduce((acc, x) => acc + x, 0)
})
.take(50) // don't create endless loop
.timestamp()
.subscribe(x => console.log(x));