29
votes

I'm trying to emit simple array values one after another with 500ms in between:

var a = Rx.Observable.from([1,2,3]);
a.interval(500).subscribe(function(b) { console.log(b); });

However, this throws an exception:

Uncaught TypeError: a.interval is not a function.
9
more succinct answers here for RxJS5 .. stackoverflow.com/questions/41225446/… - danday74

9 Answers

35
votes

Three ways to do it, with RxJS version 6 :

1. Using concatMap

import { from, of, pipe } from 'rxjs';
import { concatMap, delay } from 'rxjs/operators';

const array = [1, 2, 3, 4, 5];

from(array)
 .pipe(
   concatMap(val => of(val).pipe(delay(1000))),
 )
 .subscribe(console.log);

2. Using zip and interval

import { from, pipe, interval } from 'rxjs';
import { delay, zip} from 'rxjs/operators';

const array = [1, 2, 3, 4, 5];

from(array)
 .pipe(
   zip(interval(1000), (a, b) => a),
 )
 .subscribe(console.log);

3. Using interval as source

import { interval, pipe } from 'rxjs';
import { map, take } from 'rxjs/operators';

const array = [1, 2, 3, 4, 5];

interval(1000)
.pipe(
  take(array.length),
  map(i => array[i])
)
.subscribe(console.log);
17
votes

As already pointed out by xgrommx, interval is not an instance member of an observable but rather a static member of Rx.Observable.

Rx.Observable.fromArray([1,2,3]).zip(
  Rx.Observable.interval(500), function(a, b) { return a; })
.subscribe(
  function(x) { document.write(x + '<br \>'); },  
  null,  
  function() { document.write("complete"); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.min.js"></script>
10
votes

This is how I would do it:

var fruits = ['apple', 'orange', 'banana', 'apple'];
var observable = Rx.Observable.interval(1000).take(fruits.length).map(t => fruits[t]);
observable.subscribe(t => {
  console.log(t);
  document.body.appendChild(document.createTextNode(t + ', '));
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.min.js"></script>
1
votes
var arrayList = [1,2,3,4,5];

var source = Rx.Observable
      .interval(500/* ms */)
      .timeInterval()
      .take(arrayList.length);

source.subscribe(function(idx){
    console.log(arrayList[idx]);
   //or document.write or whatever needed
});
1
votes

Pretty late but a simpler solution would be :

const arr = ["Hi,", "how", "may", "I", "help", "you?"];

Rx.Observable.interval(500)
             .takeWhile(_ => _ < arr.length)
             .map(_ => arr[_])
             .subscribe(_ => console.log(_))    
1
votes

I find Weichhold technique to be the best but that it would gain in clarity of intent by extracting the zipped value outside of the zip:

// assume some input stream of values:
var inputs = Obs.of(1.2, 2.3, 3.4, 4.5, 5.6, 6.7, 7.8);
// emit each value from stream at a given interval:
var events = Obs.zip(inputs, Obs.interval(1000))
    .map(val => val[0])
    .forEach(console.log);
1
votes

If you want to release samples over time, you can do something like this

const observable = interval(100).pipe(
  scan((acc, value) => [value, ...acc], []),
  sampleTime(10000),
  map((acc) => acc[0])
);
0
votes

I had a little different requirement, my array kept updating over time too. So basically I had to implement a queue which I can dequeue at a regular interval, but I didn't want to use an Interval.

If somebody has a need for something like this then probably this solution can help:

I have a function createQueue() that takes the array as an input and returns an Observable which we subscribe for listening to events from the Array at a regular interval. The function also modifies the 'push()' method of the passes array so that whenever any item is pushed in the array, the Observable would emit.

createQueue(queue: string[]) {
  return Observable.create((obs: Observer<void>) => {
    const arrayPush = queue.push;
    queue.push = (data: string) => {
      const returnVal = arrayPush.call(queue, data);
      obs.next();
      return returnVal;
    }
  }).pipe(switchMap(() => {
    return from([...queue])
      .pipe(
        concatMap(val => of(val)
        .pipe(delay(1000)))
      );
  }), tap(_ => queue.shift()))
}

Lets say that the array is: taskQueue = [];

So, we need to pass it to the above function and subscribe to it.

  createQueue(taskQueue).subscribe((data) => {
    console.log('Data from queue => ', data);
  });

Now, every time we do taskQueue.push('<something here>'), the subscription will trigger after a delay of "1000ms".

Please note: we should not be assigning a new array to the taskQueue after createQueue() has been called, or else we will loose the modified push().

Here is a dummy example for the above implementation: Test Example

-3
votes

Rx.Observable instance doesn't have interval method http://xgrommx.github.io/rx-book/content/core_objects/observable/observable_instance_methods/index.html. You can use like this.

Rx.Observable.interval(500)
             .map(function(v) { return [1,2,3];})
             .subscribe(console.log.bind(console));