1
votes

Background

Using Typescript and RxJS, I'm trying to extend a behavior subject so that it takes an observable as a construction parameter and then subscribes to it so that each value from the observable gets set as the value of the behavior subject.

Question

How do I make it so that the behavior subject is only subscribed to the inner observable when the behavior subject itself has at least one subscriber?

Code

import { Observable } from 'rxjs/Observable';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';

export class SubjectWithInnerObservable<T> extends BehaviorSubject<T> {

    constructor(
        initialState: any,
        someObservable: Observable<T>
    ) {
        super(initialState);

        // I only want this subscription to exist when there are one or more subscribers to the behavior subject
        someObservable.subscribe(this);
    }
}
1
Why do you want this? Also Subjects themselves are observables and produce values so what's wrong with just having the behavior subject emit the value?Mike Tung
I removed a bunch of context to remove the noise and focus on the part I need help on. The observable that gets injected is actually coming from a service that connects to websockets when there is at least one subscriber. I don't want that connection live if it doesn't have to be. In addition, there is actually going to be come complex mapping/translation of the value that comes off the observable before it gets assigned to the behavior subject, but I didn't want to show that.Andrew
It's a behavior subject because the value that gets stored in it is actually an accumulation of what comes through the observable and I want new subscribers to get the accumulated value.Andrew
I would encapsulate BehaviorSubject not extend it.Richard Matsen
is only subscribed ... when ... has at least one subscriber sounds like refCount.Richard Matsen

1 Answers

1
votes

You can use a .publishBehavior() operator on the websocket, which internally has a BehaviorSubject.

The 'publish' bit means it won't emit until connected, like a tap on a hose.

Add to that the .refCount() operator and you get auto-connect on subscription, i.e a tap that's controlled by subscription count.

Docs: RefCount

Demo

// mock websocket
const ws = new Rx.Subject()

const autoConnected = ws
  .do(x => console.log('ws emits', x)) // just to show ws stream
  .publishBehavior(null)  // make connectable, i.e only emit when subscribed
  .refCount()             // auto-connect on subscribe
  .filter(x => x)         // filter out that pesky initial value

ws.next(1) // before subscription - never emits

const subscription = autoConnected.subscribe(x => console.log('1st subscription', x))
ws.next(2)
ws.next(3)

subscription.unsubscribe()
ws.next(4)  // after unsubscribe - never emits

autoConnected.subscribe(x => console.log('2nd subscription', x))
ws.next(5) // after re-subscription - emits last plus next
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>