34

I'm trying to emit simple array values one after another with 500ms in between:

var a = Rx.Observable.from([1,2,3]); a.interval(500).subscribe(function(b) { console.log(b); }); 

However, this throws an exception:

Uncaught TypeError: a.interval is not a function. 
1

8 Answers 8

41

Three ways to do it, with RxJS version 6 :

1. Using concatMap

import { from, of, pipe } from 'rxjs'; import { concatMap, delay } from 'rxjs/operators'; const array = [1, 2, 3, 4, 5]; from(array) .pipe( concatMap(val => of(val).pipe(delay(1000))), ) .subscribe(console.log); 

2. Using zip and interval

import { from, pipe, interval } from 'rxjs'; import { delay, zip} from 'rxjs/operators'; const array = [1, 2, 3, 4, 5]; from(array) .pipe( zip(interval(1000), (a, b) => a), ) .subscribe(console.log); 

3. Using interval as source

import { interval, pipe } from 'rxjs'; import { map, take } from 'rxjs/operators'; const array = [1, 2, 3, 4, 5]; interval(1000) .pipe( take(array.length), map(i => array[i]) ) .subscribe(console.log); 
Sign up to request clarification or add additional context in comments.

2 Comments

Beware! zip operator is deprecated in favor of static zip! Source: rxjs-dev.firebaseapp.com/api/operators/zip So it becomes: import { zip } from 'rxjs'; zip(interval(1000), from(array)).map((a, b) => a).subscribe(...)
Thanks for all the 3 solutions. The first is the most simplest and straightforward.
18

As already pointed out by xgrommx, interval is not an instance member of an observable but rather a static member of Rx.Observable.

Rx.Observable.fromArray([1,2,3]).zip( Rx.Observable.interval(500), function(a, b) { return a; }) .subscribe( function(x) { document.write(x + '<br \>'); }, null, function() { document.write("complete"); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.min.js"></script>

Comments

10

This is how I would do it:

var fruits = ['apple', 'orange', 'banana', 'apple']; var observable = Rx.Observable.interval(1000).take(fruits.length).map(t => fruits[t]); observable.subscribe(t => { console.log(t); document.body.appendChild(document.createTextNode(t + ', ')); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.min.js"></script>

1 Comment

interesting but this would not satisfy the general case of wanting to output items coming from an event stream at fixed interval of time (which OP seems to want based on example)
1
var arrayList = [1,2,3,4,5]; var source = Rx.Observable .interval(500/* ms */) .timeInterval() .take(arrayList.length); source.subscribe(function(idx){ console.log(arrayList[idx]); //or document.write or whatever needed }); 

1 Comment

interesting but this would not satisfy the general case of wanting to output items coming from one event stream at fixed interval of time
1

Pretty late but a simpler solution would be :

const arr = ["Hi,", "how", "may", "I", "help", "you?"]; Rx.Observable.interval(500) .takeWhile(_ => _ < arr.length) .map(_ => arr[_]) .subscribe(_ => console.log(_)) 

Comments

1

I find Weichhold technique to be the best but that it would gain in clarity of intent by extracting the zipped value outside of the zip:

// assume some input stream of values: var inputs = Obs.of(1.2, 2.3, 3.4, 4.5, 5.6, 6.7, 7.8); // emit each value from stream at a given interval: var events = Obs.zip(inputs, Obs.interval(1000)) .map(val => val[0]) .forEach(console.log); 

Comments

1

If you want to release samples over time, you can do something like this

const observable = interval(100).pipe( scan((acc, value) => [value, ...acc], []), sampleTime(10000), map((acc) => acc[0]) ); 

Comments

0

I had a little different requirement, my array kept updating over time too. So basically I had to implement a queue which I can dequeue at a regular interval, but I didn't want to use an Interval.

If somebody has a need for something like this then probably this solution can help:

I have a function createQueue() that takes the array as an input and returns an Observable which we subscribe for listening to events from the Array at a regular interval. The function also modifies the 'push()' method of the passes array so that whenever any item is pushed in the array, the Observable would emit.

createQueue(queue: string[]) { return Observable.create((obs: Observer<void>) => { const arrayPush = queue.push; queue.push = (data: string) => { const returnVal = arrayPush.call(queue, data); obs.next(); return returnVal; } }).pipe(switchMap(() => { return from([...queue]) .pipe( concatMap(val => of(val) .pipe(delay(1000))) ); }), tap(_ => queue.shift())) } 

Lets say that the array is: taskQueue = [];

So, we need to pass it to the above function and subscribe to it.

 createQueue(taskQueue).subscribe((data) => { console.log('Data from queue => ', data); }); 

Now, every time we do taskQueue.push('<something here>'), the subscription will trigger after a delay of "1000ms".

Please note: we should not be assigning a new array to the taskQueue after createQueue() has been called, or else we will loose the modified push().

Here is a dummy example for the above implementation: Test Example

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.