0
votes

I'm still new to RXJS and trying to get my head around it. The task is as follows:

A user can add assets to a collection, and he might click faster than the backend can handle the requests, so that an error might occur because the lockVersion of the collection conflicts with the one sent by the client (not updated yet). A hacky solution would be to reload the collection when the 409 error occurs, but I thought a more elegant solution would a be a request queue.

Also, we will will have to handle justified 409 errors, when two users are meddling with the same collection, so a queue will come in handy then (reload the collection, re-add the item to the queue).

Here's my naive approach, which actually works:

export class RequestQueue<T> {
  private items: T[] = [];
  private queue$: Subject<boolean> = new Subject<boolean>();
  private dispatcher: Subject<T> = new Subject<T>();

  constructor(
    private readonly requestHandler: (v: T) => Observable<unknown>,
    private readonly errorHandler: (e: HttpErrorResponse) => Observable<boolean>
  ) {
    this.initQueue();
  }

  public add(item: T) {
    if (this.items.push(item) == 1) {
      this.dispatcher.next(this.items.shift());
    }
    return this.queue$;
  }

  public destroy () {
    this.dispatcher.complete();
  }

  private initQueue() {
    this.dispatcher.pipe(
      flatMap(item => {
        return this.requestHandler(item)
      }),
      tap(r => {
        if (this.items.length > 0) {
          this.dispatcher.next(this.items.shift());
        } else {
          this.queue$.next(true);
        }
      }),
      catchError((error: HttpErrorResponse) => {
        let error$ = this.errorHandler(error);
        this.initQueue(); //<--- please note this line
        return error$;
      })
    ).subscribe();
  }
}

As you can see, I am holding an array which can be filled up with items as much as the user feels like clicking. When the first item is added, the dispatcher emits it as its first event (value). After every successful request (collection updated), it shifts the next item from the array and emits it. This happens until the array is empty. The queue$ observer then emits 'true' to notify the caller of the queue, that it is done.

This actually works, and the 409 error never happens again because of a fast-clicking user. The trouble starts, when another error (e.g. 404) occurs. The error handler pops up a message, the user clicks ok, and the pipe of the dispatcher is dead. The next click to add an item to the collection won't cause anything, unless I initialize the queue (re-pipe to the dispatcher) again.

This doesn't feel like an elegant solution at all. I'm not quite sure why this happens, but I would really like to understand it. Any explanation or suggestion for a better solution is highly appreciated.

2

2 Answers

1
votes

A different approach would be to have your add function (I name it request) return an Observable that actually emits what the http request emits and can be used to cancel the request.

private dispatcher: Subject<Observable<unknown>> = new Subject<Observable<unknown>>();

public request(item: T) {
  return new Observable(observer => {
    const cancel = new Subject();
    const request$ = this.requestHandler(item).pipe(
      catchError(this.errorHandler),
      takeUntil(cancel), // cancel the request on cancel events
      tap(observer) // attach the observer to the request
    ));
    this.dispatcher.next(request$) // add the request to the queue
    return () => { // send a cancel event on unsubscribe
      cancel.next();
      cancel.complete();
    }
  });
}

private initQueue() {
  this.dispatcher.pipe(concatAll()).subscribe();
}

Use it like

cancel = new Subject();

send(item: T) {
  request(item).pipe(
    takeUntil(this.cancel)
  ).subscribe()
}

cancelAll() {
  this.cancel.next()
}

https://stackblitz.com/edit/rxjs-3xvo8g?file=index.ts

1
votes

Try this: catch the error on your requestHandler instead of in your dispatch pipe. If your dispatch pipe receives an error, it will be done. If the requestHandler catches an error and doesn't throw a new one, then that request will die but the dispatcher will live on.


As a quick aside:

You'd probably benefit from the use of concatMap. concatMap does not subscribe to the next observable until the previous completes, which is exactly the behavior you're describing.

constructor(/* ... */) {
  this.dispatcher.pipe(
    concatMap(item => this.requestHandler(item))
  ).subscribe();
}

Now you don't need a queue at all since concatMap will only process one request at a time.


Back to errorHandling:

A stream is done once it emits an error or a complete signal. So re-creating your request upon failure really is the way to go. You can do this manually with catchError, or you can do this using retry and/or retryWhen. Ideally, you'd use both. Here I'll retry the request 5 times and then if it still fails, I'll give up and drop the request and continue on with the rest.

constructor(/* ... */) {
  this.dispatcher.pipe(
    concatMap(item => this.requestHandler(item).pipe(
      retry(5),
      catchError(err => {
        // Here is the place to alert the user to the error or handle it however
        // you like.

        // empty() will complete immediately, dropping this request and
        // telling concatMap that it can just start the next request instead.

        //  - You can throw an error here to cancel all pending requests
        //  - You can convert the item into a placeholder item so that the user
        // can re-try adding an asset on their own leisure. (If you build that
        // capability into the UI)

        return EMPTY;
      })
    ))
  ).subscribe();
}

Now requestHandler(item) will be called up to five times. If it fails a sixth time, the request is dropped and concatMap will move onto the next one. You can of course change this to whatever you'd like. retryWhen() lets you decide under which conditions you'd like to retry.

After catchError, the source observable is dead, but any observable returned carries on. You can manipulate this behavior however you'd like.