3
votes

I'm working on a session service that checks if the auth token is expired. If it is, it makes a call to refresh the token. During this request, all incoming requests should be queued, and emitted once the request is finished. After that, all incoming request can pass through without the queue until the token expires again. I draw a marble diagram for this:

1. ---a---b---c--d-----e--
2. -t-------f------t------
3. ---a---b---------cd-e--

I named 1. as the incoming$ Observable, 2. is valve$ - if it's true, requests can pass through, if it's false, they should be queued. When it turns true, the queued are fired.

What I've done so far? I think this should be done by adding an intermediate Observable called receiver$ which changes its value depending on valve$. When valve$ is true, it just return a simple subject, if it's false, it returns one that's capable of recording values.

receiver$ = valve.pipe(
  map((value) => {
    if (value) {
      return new Subject();
    } else {
      return (new Subject()).pipe(
        shareReplay(),
      );
    }
  })
);

And then every new value obtained in incoming$ should be added to the current observable in recevier$:

incoming$.pipe(
  combineLatest(receiver$),
).subscribe((incomingValue, recevier) => {
  recevier.next(incomingValue);
});

And here's the part I cannot wrap my head around. Whenever valve turns true, I'd need the last two values from receiver$. The second last would hold the queue, and the last would hold the active subject. By merging them I could achieve my goal. I don't know how to implement this and how subscriptions will be managed. Also, this looks overly complicated for such a seemingly simple use case.

What's the best way of implementing this behavior?

2
I used egghead.io about a year ago to do some advanced stuff with RxJs. (most of which I've forgotten now - as I'm on other things!) Perhaps take a look at that or try using some Google Kungfu to track down authors blog. - JGFMK

2 Answers

1
votes

You can consider a solution along these lines.

First you create a Subject through which you emit all the requests you want to make

const requests$ = new Subject<Observable<any>>()

Then you create a Subject through which you communicate the state of the valve, i.e. whether you can execute the request immediately or you have to buffer it

const valve$ = new Subject<boolean>();

Now you can create a stream which pass the requests only if the valve is open, i.e. if last value emitted by valve$ is true

const openStream$ = valve$.pipe(
  switchMap(valve => {
    if (valve) {
      return requests$;
    } else {
      return empty();
    }
  })
);

You can also create a stream which buffers all requests when the valve is closed

const bufferedStream$ = requests$.pipe(
  bufferToggle(valve$.pipe(filter(valve => !valve)), () => valve$.pipe(filter(valve => valve))),
  mergeMap(bufferedCalls => bufferedCalls)
)

Now all you have to do is to merge openStream$ and bufferedStream$ and subscribe to the resulting stream, like this

merge(openStream$, bufferedStream$).pipe(
  mergeMap(request => request)
)
.subscribe(httpCallResult => {// do stuff})

I have tested this solution with the following data, simulating real http calls with Observables of string

const requests$ = new Subject<Observable<string>>();
setTimeout(() => {requests$.next(of('A'))}, 50);
setTimeout(() => {requests$.next(of('B'))}, 60);
setTimeout(() => {requests$.next(of('C'))}, 100);
setTimeout(() => {requests$.next(of('D'))}, 110);
setTimeout(() => {requests$.next(of('E'))}, 130);
setTimeout(() => {requests$.next(of('F'))}, 250);
setTimeout(() => {requests$.next(of('G'))}, 260);
setTimeout(() => {requests$.next(of('H'))}, 300);
setTimeout(() => {requests$.next(of('I'))}, 310);
setTimeout(() => {requests$.next(of('L'))}, 330);


const valve$ = new Subject<boolean>();
setTimeout(() => {valve$.next(true)}, 30);
setTimeout(() => {valve$.next(false)}, 80);
setTimeout(() => {valve$.next(true)}, 120);
setTimeout(() => {valve$.next(false)}, 200);
setTimeout(() => {valve$.next(true)}, 290);
1
votes

You can do this by just using concatMap that merges two different streams based on the value form valve$. Note that this requires that both valve$ and incoming$ are shared with share().

valve$
  .pipe(
    concatMap(v => v
      ? incoming$.pipe(takeUntil(valve$))
      : incoming$
        .pipe(
          takeUntil(valve$),
          bufferCount(Number.POSITIVE_INFINITY),
          mergeAll(),
        )
    ),
  )
  .subscribe(console.log)

Live demo: https://stackblitz.com/edit/rxjs6-demo-d3bsxb?file=index.ts