1
votes

I have a function that does an http request based on a parameter. And I want to add some kind of "debounce" functionality. So if the function gets called multiple times in a set time window, I want to combine the parameters into one request instead of making multiple requests.

I want to achieve this with Observables and in Angular. This does not sound that complicated, however I'm not able to get it running, maybe I'm missing something.

For now let's just skip the combining in a single request as this can be done with an aggregated debounce or a Oberservable.buffer. I have trouble combining the single Observables.

Here's what I've tried so far.

I tried using a Subject, as this seemed to be the proper object for this case (https://stackblitz.com/edit/angular-hcn41v?file=src%2Fapp%2Fapp.component.ts).

constructor(private http: HttpClient) {
  this.makeRequest('1').subscribe(x => console.log(x))
  this.makeRequest('2').subscribe(console.log)
  setTimeout(() => {
    this.makeRequest('3').subscribe(console.log)
  }, 1000)
}

private makeRequest(id: string) {
  this.observable = this.observable.pipe(
    merge(Observable.of(id).pipe(delay(1)))
  )
  return this.aggregateDebounce(this.observable)
}

private getUrl(value) {
  console.log('getUrl Call', value);
  return 'https://jsonplaceholder.typicode.com/posts/1';
}

private aggregateDebounce(ob$) {
  const shared$ = ob$.publishReplay(1).refCount()
  return shared$.buffer(shared$.debounceTime(75))
}

I expect to have one 'getUrl Call' log for each function call and one result log. However I only get results if I add more than 1 calls to this.makeRequest() and the result is also weird. All previous values are always returned as well. I think I don't fully understand how Subject works in this case.

Another approach (taken from here RXJS: Aggregated debounce) was to create some sort of aggregate debounce (https://stackblitz.com/edit/angular-mx232d?file=src/app/app.component.ts)

constructor(private http: HttpClient) {
  this.makeRequest('1').subscribe(x => console.log(x))
  this.makeRequest('2').subscribe(console.log)
  setTimeout(() => {
    this.makeRequest('3').subscribe(console.log)
  }, 1000)
}

private makeRequest(id: string) {
  this.observable = this.observable.pipe(
    merge(Observable.of(id).pipe(delay(1)))
  )
  return this.aggregateDebounce(this.observable)
}

private getUrl(value) {
  console.log('getUrl Call', value);
  return 'https://jsonplaceholder.typicode.com/posts/1';
}

private aggregateDebounce(ob$) {
  const shared$ = ob$.publishReplay(1).refCount()
  return shared$.buffer(shared$.debounceTime(75))
}

In this scenario I have the problem I'm also getting all previous values as well.

In theory (at least to me) both variants sounded plausible, however it seems like I'm missing something. Any wink in the right direction is highly appreciated.

Edit:

As requested I added the final real-world goal.

Imagine a service that requests information from an API. Within 50-75ms you call the service with a certain id. I want to group those ids together to a single request instead of doing 3. And if 100ms later another call to the service is made, a new request will be done

2
Do you mean if you get the parameter that was executed before, you don't want to exectute it again, right? How long time you don't want to re-run again? - Will Huang
exactly. I added a example for further clarification. 50-75ms - Nicolas Gehlert
I have updated my answer to include a possible solution. - a better oliver
awesome thanks. will have a look if I can apply this to my example. - Nicolas Gehlert

2 Answers

1
votes
this.makeRequest(1).subscribe();

private makeRequest(number: number) {
  this.values.next(number);
  return this.idObservable.pipe(

You emit the value before you subscribe -> The value gets lost.

private values: Subject = new Subject();
private idObservable = this.values.pipe(

private makeRequest(number: number) {
  this.values.next(number);
  return this.idObservable.pipe(    

Every call creates a new observable based on the subject. Whenever you emit a value, all subscribers receive the value.

A possible solution could look something like this (I'm using the new rxjs syntax here):

subject: Subject<String> = null;
observable = null;
window = 100;

constructor() {
  this.subject = null;
  this.window = 100;

  this.makeRequest('1').subscribe(console.log)
  this.makeRequest('2').subscribe(console.log)
  setTimeout(() => {
    this.makeRequest('3').subscribe(console.log)
  }, 1000)
}

private makeRequest(id: string) {
  if (!this.subject) {
    this.subject = new ReplaySubject()
    this.observable = this.subject.pipe(
      takeUntil(timer(this.window).pipe(take(1))),
      reduce((url, id, index) => this.combine(url, id), baseUrl),
      flatMap(url => this.request(url)),
      tap(() => this.subject = null),
      share()
    )
  }      

  this.subject.next(id);
  return this.observable;
}  

Where combine creates the url and request makes the actual request.

1
votes

Rxjs is quite good at handling this kind of case. You'll need two different Subjects:

  1. One will be used to collect and combine all requests
  2. The second will be used for subscribing to results

When a request is made, the value will be pushed onto the first subject but the second will be returned, abstracting away the internal logic of combining requests.

private values: Subject = new Subject();
private results: Subject = new Subject();

private makeRequest(number: number) {
  this.values.next(number);
  return this.results;
}

The pipeline for merging the requests could be a buffer and debounceTime as indicated in the question or other logic, as required. When a response is recieved, it just needs to be pushed onto the results Subject:

constructor(private http: HttpClient) {
  this.values
    .pipe(
      buffer(this.values.pipe(debounceTime(1000))),
      switchMap(values => this.getUrl(values)),
      map(response => this.results.next(response)))
    .subscribe();
}

I've used a switchMap to simulate an asynchronous request before pushing the response onto the results.

Full example here: https://angular-8yyvku.stackblitz.io