2
votes

I am newbie trying out rxjs and nestjs. The use case that I am currently trying to accomplish is for educational purpose. So I wanted to read a json file (throw an observable error in case of the file being empty or cannot be read) using the "fs" module. Now I create an observable by reading the file asynchronously, set the observer in the subject and then subscribe to the subject in the controller. Here is my code in the service

@Injectable()
export class NewProviderService {
    private serviceSubject: BehaviorSubject<HttpResponseModel[]>;
    // this is the variable that should be exposed. make the subject as private
    // this allows the service to be the sole propertier to modify the stream and
    // not the controller or components
    serviceSubject$:  Observable<HttpResponseModel[]>;

    private serviceErrorSubject: BehaviorSubject<any>;
    serviceErrorSubject$: Observable<any>;
    filePath: string;
    httpResponseObjectArray: HttpResponseModel[];
    constructor() {
        this.serviceSubject = new BehaviorSubject<HttpResponseModel[]>([]);
        this.serviceSubject$ = this.serviceSubject.asObservable();

        this.serviceErrorSubject = new BehaviorSubject<any>(null);
        this.serviceErrorSubject$ = this.serviceErrorSubject.asObservable();

        this.filePath = path.resolve(__dirname, './../../shared/assets/httpTest.json');
    }
     readFileFromJson() {
          return new Promise((resolve, reject) => {
            fs.exists(this.filePath.toString(), exists => {
                if (exists) {
                    fs.readFile(this.filePath.toString(), 'utf-8' , (err, data) => {
                        if (err) {
                            logger.info('error in reading file', err);
                            return reject('Error in reading the file' + err.message);
                        }

                        logger.info('file read without parsing fg', data.length);
                        if ((data.length !== 0) && !isNullOrUndefined(data) && data !== null) {
                            // this.httpResponseObjectArray = JSON.parse(data).HttpTestResponse;
                            // logger.info('array obj is:', this.httpResponseObjectArray);
                            logger.info('file read after parsing new', JSON.parse(data));
                            return resolve(JSON.parse(data).HttpTestResponse);
                        } else {
                            return reject(new FileExceptionHandler('no data in file'));
                        }
                    });
                } else {
                    return reject(new FileExceptionHandler('file cannot be read at the moment'));
                }
            });
          });
    }


        getData() {
                 from(this.readFileFromJson()).pipe(map(data => {
                        logger.info('data in obs', data);
                        this.httpResponseObjectArray = data as HttpResponseModel[];
                        return this.httpResponseObjectArray;
                 }), catchError(error => {
                     return Observable.throw(error);
                 }))
                 .subscribe(actualData => {
                    this.serviceSubject.next(actualData);
                }, err => {
                    logger.info('err in sub', typeof err, err);
                    this.serviceErrorSubject.next(err);
                });
            }

Now this is the controller class

@Get('/getJsonData')
public async getJsonData(@Req() requestAnimationFrame,@Req() req, @Res() res) {

  await this.newService.getData();
  this.newService.serviceSubject$.subscribe(data => {
    logger.info('data subscribed', data, _.isEmpty(data));
    if (!isNullOrUndefined(data) && !_.isEmpty(data)) {
      logger.info('coming in');
      res.status(HttpStatus.OK).send(data);
      res.end();
    }
  });
}

The problem I face is that I can get the file details for the first time and the subscription is getting called once > its working fine. On the subsequent requests

Error [ERR_HTTP_HEADERS_SENT]: Cannot set headers after they are sent to the client
    at ServerResponse.setHeader (_http_outgoing.js:470:11)
    at ServerResponse.header (C:\personal\Node\test-nest.js\prj-sample\node_modules\express\lib\response.js:767:10)
    at Ser

and the endpoint /getJsonData results in an error. Could someone help me out. i believe the subscription is not getting properly after the first call, but not sure how to end that and how to resolve that

3
Is there a reason why you don't return your Observable directly from getData() but instead subscribe to serviceSubject$? Also, you don't need to work with @Req and @Res in your controller. Remove those parameters and return the Observable directly. Nest.js will handle the subscription.Kim Kern
the actual implementation will have to have a subject capable of returning the latest updated data and seems Behaviorsubject satisfies it. Also the suggestions seem not to make the subject available to the subscribers (the controllers) and hence made use of a variablevijayakumarpsg587
I also wanted to learn the response in handling the data emitted by the observable. So is nestjs auto handling the response closure once an observable is emitted? Not sure why it fails the above test case. I would give it a try by returning the observable directly, but wanted to know the reason for failurevijayakumarpsg587
Auto handling: "Nest will automatically subscribe to the source underneath and take the last emitted value (once the stream is completed)." I'll have a look at your code in more detail later.Kim Kern
@KimKern Thank you so much. Also I wanted to know if nestjs will handle this or is it something that is isloated to AxiosResponse type. The observable that I am producing is not an axiosresponse but a plain old observable. So wanted to know if by default, nestjs tries to subscribe to the observable and produce a response and on the other hand , I manually try the same and that results in an error?vijayakumarpsg587

3 Answers

4
votes

The problem is that you're subscribing to your serviceSubject in your controller. Every time a new value is emitted, it will try to send the response. This works the first time, but the second time it will tell you it can't send the same response again; the request has already been handled.

You can use the pipeable first() operator to complete the Observable after the first value:

@Get('/getJsonData')
public async getJsonData() {
  await this.newService.getData();
  return this.newService.serviceSubject$.pipe(first())
}

You want your Observable to be shared (hot), so that every subscriber always gets the same, latest value. That's exactly what a BehaviourSubject does. So you should not convert your Subject to an Observable when you expose it publicly because you will lose this desired behavior. Instead, you can just cast your Subject to Observable, so that internally it is still a subject but it will not expose the next() method to emit new values publicly:

private serviceSubject: BehaviorSubject<HttpResponseModel[]>;
get serviceSubject$(): Observable<HttpResponseModel[]> {
   return this.serviceSubject;
}
0
votes

I think trying to convert the cold observable (the one that I created) to a hot/warm observable might help to plugin to a single source and emit and complete its execution and maintain the last emitted data to any cloned values. So I make the cold observable to a warm observable using the publishLast(), refCount() operators, and I could achieve the single subscription and the execution completion of the observable. Here are the change I made to work.

This is the service class change I made

 getData() {
        return from(this.readFileFromJson()).pipe(map(data => {
                logger.info('data in obs', data);
                this.httpResponseObjectArray = data as HttpResponseModel[];
                return this.httpResponseObjectArray;
         }), publishLast(), refCount()
          , catchError(error => {
             return Observable.throw(error);
         }));
        //  .subscribe(actualData => {
        //     this.serviceSubject.next(actualData);
        // }, err => {
        //     logger.info('err in sub', typeof err, err);
        //     this.serviceErrorSubject.next(err);
        // });
    }

And this is the change I made in the controller

public async getJsonData(@Req() req, @Res() res) {
    let jsonData: HttpResponseModel[];
    await this.newService.getData().subscribe(data => {
      logger.info('dddd', data);
      res.send(data);
    });
}

Any answers that allow the observables to be first subscribed to subjects and then subscribing that subject in the controller is also welcome.

I found a great post on hot vs cold observables and how to make an observable subscribe to a single source and convert a cold, to a hot/warm observable - https://blog.thoughtram.io/angular/2016/06/16/cold-vs-hot-observables.html

0
votes

I would recommend to return the Promise directly to the controller. Here, you don't need an Observable. For the subscribers, you additionally emit the value of the Promise to your serviceSubject.

async getData() {
  try {
    const data = await this.readFileFromJson();
    this.serviceSubject.next(data as HttpResponseModel[]);
    return data;
  } catch (error) {
    // handle error
  }
}

In your controller you can just return the Promise:

@Get('/getJsonData')
public async getJsonData() {
  return this.newService.getData();
}