3

I'm applying an async method to each value emitted by an observalbe. I would like this async method to be applied to the next emitted value only once the async method completed for the previous value.

Here is a short example :

import { of } from "rxjs"; const timeout = (ms) => { return new Promise(resolve => setTimeout(resolve, ms)); }; of(1, 2, 3).subscribe(async (val) => { console.log(`starting to use ${val}`); await timeout(1000); console.log(`done using ${val}`); }); 

timeout is updating my state and fetching data from a server
Output :

// starting to use 1 // starting to use 2 // starting to use 3 (wait 1 sec) // done using 1 // done using 2 // done using 3 

What I would lie to get is :

// starting to use 1 (wait 1 sec) // done using 1 // starting to use 2 (wait 1 sec) // done using 2 // starting to use 3 (wait 1 sec) // done using 3 

3 Answers 3

1

You can simply use concatMap:

of(1, 2, 3).pipe( concatMap(val => timeout(1000)) ); 

concatMap will receive the 3 emitted values and will "work" on them one at a time, waiting for the result of each promise before moving on to the next one.

Here's a working StackBlitz demo.

Sign up to request clarification or add additional context in comments.

Comments

0

With adding an await before the timeout call like so: await timeout(1000) this will at least bring your code inside the async ()=>{} in the right order. However the function is started in the moment a new event occurs in the observable. This lets you see the three starting log at the beginning.

If you want your observable to emit one value per second you may consider using interval or some debounce mechanims instead of of()

1 Comment

1. Sorry I had forgotten the await - I put it back in my Edit // 2. I don't have control over the Observable.
0

You can use from to convert a promise to an Observable. With that you have full control of your observables, and you get to use all the nice things which rxjs provided (all other methods and operators like pipe, switchMap, etc).

import { of, from } from 'rxjs'; of(1, 2, 3) .pipe( switchMap((val) => { console.log(`starting to use ${val}`); return from(timeout(1000)) }) ) .subscribe(finalValue => { console.log(`done using ${val}`); }) 

From the docs (no pun intended):

Creates an Observable from an Array, an array-like object, a Promise, an iterable object, or an Observable-like object.

3 Comments

the way you did it the finalValue would be the result of the timeout function not the original val.
@DavidB. I would assume OP is using that timeout as just a mock async method to simulate a promise, not necessary the real working one.
It's not necessary to use from within switchMap; you can simply return the promise. (I believe the xxxMap operators internally use from)

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.