The Observable creation process flows as follows:
An Observable is defined by the author (here manually with new, for purposes of the explanation):
const myObservable = new Observable(function subscribe(subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.complete();
return function tearDownLogic() {
console.log('runs when Observable for whatever reason is done (complete, error, or unsubscribed)')
}
});
The subscribe callback passed to the Observable above is saved locally by the Observable constructor:
constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
if (subscribe) {
this._subscribe = subscribe;
}
}
So, we have that entire subscribe function, defined by us or any other pre-made Observable, saved down for later execution.
An Observer can be passed to the subscribe callback in one of several forms. Either, as one to three functions directly (next, error, complete), or as an object with one or more of the same three methods. For purposes of this explanation, we will implement the last and more verbose option:
const observer = {
next(v) {
console.log(v);
}
error(err) {
console.log(err);
}
complete() {
console.log('Observable has now completed and can no longer emit values to observer');
}
}
Now, the fun part starts. We pass the observer into the Observable.subscribe(..) method:
myObserver.subscribe(observer);
The subscribe method looks like this:
subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
error?: (error: any) => void,
complete?: () => void): Subscription {
const { operator } = this;
const sink = toSubscriber(observerOrNext, error, complete);
if (operator) {
sink.add(operator.call(sink, this.source));
} else {
sink.add(
this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?
this._subscribe(sink) :
this._trySubscribe(sink)
);
}
if (config.useDeprecatedSynchronousErrorHandling) {
if (sink.syncErrorThrowable) {
sink.syncErrorThrowable = false;
if (sink.syncErrorThrown) {
throw sink.syncErrorValue;
}
}
}
return sink;
}
Briefly described, the subscribe method:
- Receives the
observer in one of its previously discussed forms
toSubscriber converts the observer to a Subscriber object, regardless of its passed in form (the Subscriber instance is saved in the sink variable)
- Note: The
operator variable is undefined, unless you subscribe to an operator. Thus, just ignore the if statements around operator
Subscriber extends (is prototype-linked to) the Subscription object, which has two important methods on its prototype: unsubscribe(), add()
add(..) is used to add "tear down logic" (functions) to the Observable, that will run when an Observable completes or is unsubscribed. It will take any function that is passed to it, wrap it in a Subscription object, and place the function into the Subscription's _unsubscribe variable. This Subscription is saved on the Subscriber we created above, in a variable called _subscriptions. As noted, we do all that so that when the Subscriber is unsubscribed or completes, all the add()'ed tear down logic executes
- As a side note,
Observable.subscribe() returns the Subscriber instance. Thus, you can call mySubscriber.add( // some tear down logic) on it at any point to add functions that will execute when the Observable completes or is unsubscribed
- An important part now enfolds:
this._trySubscribe(sink) runs (inside add(), as a parameter). _trySubscribe(..) is the function that actually runs the subscribe callback earlier saved by the Observable constructor. Importantly, it passes in sink (our new Subscriber instance) as the callback to the Observable callback. In other words, when subscriber.next(1) inside the Observable executes, we are actually executing next(1) in the sink (Subscriber) instance (next() is on Subscriber's prototype).
So, that takes me to the end, for now. There are more details inside toSubscribe and around the unsubscribe process, among other things, but those are outside the scope of this Q&A.
In short summary, to answer the question in the title, the Observer is indeed passed into the Observable, simply after being converted to a unifying Subscriber object.
Hopefully, that will help someone else in the future.
rejectandresolvefunctions passed to the callback in thePromiseconstructor (the executor function) are not in fact coming fromthen(onResolve, onReject). They come from the internal Promise implementation and when called they somehow execute the handlers inthen, if attached. developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/… - Magnusnext,completeanderrrorall exist and that any errors thrown from observer callbacks are caught and are routed appropriately. The subscriber is essentially an implementation detail: github.com/ReactiveX/rxjs/issues/2314 - cartantsubscribemethod calls the function passed to theObservableconstructor. That function is stored internally as_subscribeand it's called with the wrapped observer that's passed to thesubscribemethod here - note thetoSubscribercall (just above it) that wraps the observer. - cartant