2
votes

I have an RXJS subject which, when I emit to it (with next), makes an HTTP call and timesout after 5 secs. I've turned the backend off so that it ALWAYS timesout. The timeout causes the subscribes error func to be called. Perfect.

However, when I emit to the subject a second time, i see that the subject has 0 observers. The timeout error undesirably removes all current observers to the RXJS subject. However, I do not want this behaviour. I want all observers to remain subscribed.

How do I fix this?

The important line of code is ...

console.log(this.getDocumentsSubject.observers.length);

Which returns 1 when called the 1st time.

BUT Which problematically returns 0 when called the 2nd time, after a timeout.

Full code below.

// RXJS SUBJECT AND ASSOCIATED OBSERVABLE

private getDocumentsSubject = new Subject<ElasticFilteredQuery>();
public getDocuments$ = this.getDocumentsSubject.asObservable().flatMap((elasticFilteredQuery: ElasticFilteredQuery) => {

let query = elasticFilteredQuery.toString();

// noinspection UnnecessaryLocalVariableJS
let restStream = this.http.post(BASE_URL + '_search', query, this.options)
  .do(() => {
    console.log('firing post');
  })
  .timeout(Config.http.timeout, new Error('timeout'))
  .map((response: any) => {

    return {
      documents: response.json().hits.hits.map((hit: any) => {
        return this.createDocumentResult(hit);
      }),
      numDocuments: response.json().hits.total,
      elasticFilteredQuery
    };
  });

return restStream;
}).publish().refCount();



// EMIT TO RXJS SUBJECT - this is being called at the correct times

public getDocuments(elasticFilteredQuery: ElasticFilteredQuery) {
  this.getDocumentsSubject.next(elasticFilteredQuery);
  console.log('watch number of observables', this.getDocumentsSubject.observers.length); // Outputs 1 initially but 0 after a timeout
}


// SUBSCRIPTION

this.esQueryService.getDocuments$.subscribe((response: any) => {
    console.log('XXXXX NEXT');
    ...
  }, (err: any) => {
    console.log('XXXXX error');
    ...
  }, () => {
    console.log('XXXXX completed');
  }
);
2

2 Answers

2
votes

This answer is solely based on the assumption, that you want to use getDocuments$ as a perpetual stream, that emits new data, whenever a a new query comes in. (If this is not the case, then the answer might not help you)

However this will not work like this, because whenever an Error is emitted on a stream, the Stream is essentially dead. (also see this answer)

This is a basic issue in your rxjs-architecture: Errors are supposed to be thrown on one-time-processes (like a rest-call), however data-streams (like documents$) are usually there to ensure that any eventual errors have already been handled, and whatever is emitted (next'ed) on the perpetual stream is reliable & valid data.

So my suggestion would be to use a .catch() to handle the error gracefully, and simple skip the emission of the documents of this call.


Slightly off-topic and maybe not relevant:

In any case it is a very unusual case to have a hard timeout for a rest-call, if you want to save server-power, then I'd suggest handling this on the server-side. Another very common case is, that you might only want to accept responses until the next query is triggered to prevent older, slower queries from showing after a new one was rendered, if this is the case, then you could use a simple .takeUntil(this.getDocumentSubject):

this.http.post(BASE_URL + '_search', query, this.options)
  .takeUntil(this.getDocumentSubject)
  .do(...

As an alternative you could use switchMap instead of the flatMap

1
votes

What you describe is RxJS (and all Reactive Extensions) behavior by design. It's not an issue nor bug. This is how it's supposed to work. Any error or complete signal makes recursive unsubscribe() calls. You're definitely not the first one asking this question on SO.

See similar:

Using catch() operator has one important "catch". This operator lets you resubscribe to the same Observable which is something you might not want because it may trigger more HTTP requests, then fail again and make an infinite loop. Be aware that this operator doesn't ignore errors.