1
votes

I'm new with RxJs and want to use this great tool on my project. I have one trip flow that I want to manage with observables, you can check on the image above. This flow starts with a user requesting a trip so system sends a post request to the server, the server creates a socket room with request ID to emit all events about this request and return with the request body and a 201 status code. The web application needs to start listen to the socket room and do somethings with events. I made one implementation with RxJs that works fine on the first time, check:

this.deliveryService.requestObservable.pipe(
  tap((data) => {
    const externalId = data ? data['externalId'] : null;
    if (externalId) {
      this.pendingRequests.push(data as TripRequest);
    }
  }),
  mergeMap((data) => {
    const externalId = data ? data['externalId'] : null;
    if (externalId) {
      return this.deliveryService.listenAcceptedTripRequest(data.externalId);
    } else {
      return Observable.of(null);
    }
  }),
  tap((data) => {
    this.pendingRequests = this.pendingRequests.filter((item) => item.externalId !== data.id);
    this.notificationService.showAcceptedRequestNotification();
  }),
  mergeMap(data => { 
    return this.deliveryService.listenTripStatusNotification(data.id);
  }),
  tap((data) => {
    this.notificationService.showTripStatusNotification(data);
    this.pendingRequests = this.pendingRequests.filter((item) => item.externalId !== data.id);
  })
).subscribe();

My problem starts with the second user trip request where the first observable emits two events, in the next, will emit three events, and the above observables will emit more and more events. I think that's the problem is because I don't complete any observable. The first observable in my project always will emit some event, but the others observables need to exists only to a specific ID. The observable listenAcceptedTripRequest emits only one event per ID, the listenTripStatusNotification Observable was many events per ID and some final events like trip finished and trip canceled. So, whats the best way to implement this kind of flow? Observable is really a good choice, or maybe I need to use some other kind of lib ?

Trip flow

1
You can start by adding some console logs to your stream to see which observable is firing multiple times. After you have found that source that is emitting multiple times you can add a take(1) to ensure that it completes after one value is passed backNico

1 Answers

0
votes

I solved my problem by controlling the socket events with some filters. When socket emits an event they emit no matter the content so when I was listening on two or more distinct trips rooms I have the error. With one verification before emitting the event to observable, my problem was solved. I also need to control the complete status of observable.

    public listenTripStatusNotification(tripId: string): Observable<any> {
    return Observable.create(observer => {
        this.webSocketService.listen(this.TRIP_STATUS_NOTIFICATION_MESSAGE, tripId, (data) => {
            if(data.tripId === tripId){
                observer.next(data);
                if(this.isFinalStatus(data.status)) {
                    observer.complete();
                }
            }
        })
      });
    }