14
votes

I have an observable that is supposed to complete an action every X seconds. The value of X changes dynamically. I have been having trouble wrapping my head around how to change this interval on the fly at run time. From what I can understand, a big breakthrough in my thinking has been that Observables cannot change once they are defined, so trying to redefine the Observalbe with a new interval value does not seem like the correct way to go about it.

I have been trying to work with the code located at https://www.learnrxjs.io/operators/transformation/switchmap.html

So far, I think that switchMap is at least on the right track. Would anyone be able to provide an example or point me to resources that might help me?

At the very least, the world definitely needs more RxJs examples!

4

4 Answers

15
votes

You can dynamically control the period using a Subject together with switchMap and interval. Whenever the subject emits a value, that value can be used to specify the interval's period:

const t = Date.now();

let subject = new Rx.Subject();
subject
  .switchMap(period => Rx.Observable.interval(period))
  .do(() => console.log('some action at time T+' + (Date.now() - t)))
  .take(8)
  .subscribe();

subject.next(50);
setTimeout(() => subject.next(100), 200);
setTimeout(() => subject.next(200), 400);
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
11
votes

Adding a working example to help the Googlers

let startButton = document.getElementById('start');
let stopButton  = document.getElementById('stop');
let updateButton = document.getElementById('update');
let i1 = document.getElementById('i1');

const start$ = Rx.Observable.fromEvent(startButton, 'click');
const stop$ = Rx.Observable.fromEvent(stopButton, 'click');
const update$ = Rx.Observable.fromEvent(updateButton, 'click')

const period = () => (parseInt(i1.value));


Rx.Observable.merge(
    start$, 
    update$, 
  )
  .switchMap(() => {
    return Rx.Observable.interval(period()).takeUntil(stop$);
  })
  .subscribe(res => {
    console.log('here in the subscription:' + res);
  })
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
<body>
  <button id="start">Start</button>
  <button id="stop">Stop</button>
  <button id="update">Update</button>
  <input id="i1" value=100></input>
</body>
1
votes

This is my solution using rxjs 6

import { interval, from } from 'rxjs';
import { map, take, concatAll, catchError, finalize } from 'rxjs/operators';

      from([1,2,4,10])
      .pipe(
        // for each item, we create interval, and take the first element
        map(x => interval(x * 1000).pipe(take(1))),
        // concat all first elements
        concatAll(),
        // handle errors
        catchError((error) => {
          console.log('oops, something bad in pipe');
          throw error;
        }),
        // finish retry
        finalize(() => {
          console.log('stop trying');
        }),
      )
      // now subscribe and perform the actual retry
      .subscribe(val => {
        console.log('perform retry');
      });

In this example, i'm trying to reconnect 4 times, after 1,2,4 and 10 seconds.

1
votes

Here's an elegant solution based on RxJS 6:

import { interval, BehaviorSubject } from 'rxjs';
import { switchMap, tap } from 'rxjs/operators';

export class SomeComponent implements OnInit {
  private interval$: BehaviorSubject<number> = new BehaviorSubject(10000);

  ngOnInit() {
    this.interval$.pipe(
        //Assuming that the value provided by the input is in seconds
        switchMap(value => interval(value * 1000)),
        tap(() => this.doStuff())
    )
    .subscribe();
  }

  intervalChanged(value){
    console.log(value);
    this.interval$.next(value);
  }

  doStuff(){
    console.log("Hi!");
  }
}

Template:

<input (input)="intervalChanged($event.target.value)" type="number">