0

I have a requirement of doing multiple API calls sequentially and using each response in next call. It looks somewhat like this:

from(Object.keys(someObj)) .pipe( concatMap(key => this.getUsers(key) ) .subscribe(res => ...) 

The above code works ok but I needed a delay so some components can process the data between the API calls. So I did:

from(Object.keys(someObj)) .pipe( concatMap(key => of(this.getUsers(key)).pipe(delay(1000)) ) .subscribe(res => ...) 

This returns a nested observable. How can I subscribe to the inner observable without nesting subscriptions?

I tried to do:

.pipe( concatMap(key => of(this.getUsers(key)).pipe(delay(1000)), mergeMap(res => res) ) 

but this, results in an incorrect order of responses.

1
  • 1
    switchMap cancels out the previous http call. It doesn't work for me Commented Jan 29, 2023 at 5:14

2 Answers 2

1

I would be suspicious of using delays to give components enough time "to process the data between the api calls". You usually do not know how much time it takes to process the data so you may end up giving too much or too little time.

You should try to see if you can use more of rxjs idiomatic style. What I mean is the following.

Let's assume you have a function processDataBetweenAPICalls(data) which synchronously process the data received from an API call before calling the following API. In this case you could try something like this

from(Object.keys(someObj)) .pipe( concatMap(key => this.getUsers(key)), tap(res => processDataBetweenAPICalls(res) ) .subscribe(res => ...) 

If instead the processing of the data between 2 API calls is not synchronous but rather is an async process, then I would try to turn this processing into an Observable, in other words I would try to create a function processDataBetweenAPICallsObs(data): Observable<any>. In this case the code could look like

from(Object.keys(someObj)) .pipe( concatMap(key => this.getUsers(key)), concatMap(res => processDataBetweenAPICallsObs(res) ) .subscribe(res => ...) 

Not knowing in more details your use case, the above suggestions may be not on target.

EDIT AFTER COMMENT STATING THAT PROCESSING IS SYNCHRONOUS

If the processing is synchronous, as for my understanding your goal should be:

  1. get a value from the stream of values generated by from(Object.keys(someObj)) (or anything similar
  2. call an API (getUsers in this case) for each value of the upstream
  3. do whatever synchronous processing you have to do for with the response returned from the API call
  4. only after the above processing is completed, move to the next value of the source stream and repeat steps 1, 2 and 3 until the source stream completes or somewhere an error occurs

If my understanding is correct, then a simple concatMap should do the work like in this example.

Sign up to request clarification or add additional context in comments.

4 Comments

I agree. I shouldn't be using delays since I don't really know how long it could take my components to process data. But I am not really sure how to implement this if my subscription looks like this (using the full format .subscribe((data) => // some processing, (error) => // error handling, () => //Complete. More processing)
What does the "some processing" do? Is it synchronous processing or is it some sort of asynchronous processing?
It is synchronous (at least for now)
You may want to look at the edit at the end of my response
1

You could use an expand and process the result of each iteration so the next call is not made until the last one is processed.

const { of, expand, switchMap, delay, map, take, skip } = rxjs; const keys = [1,2,3,4,5,6]; of(keys).pipe( // Create an accumulator with the result from the last run and the remaining keys map(keys => ({ keys, result: undefined })), expand((acc) => getUsers(acc.keys[0]).pipe(map(response => processResponse(response, acc.keys)))), skip(1), // We are not interested in the first emission with no result yet map(acc => acc.result), // We only want the result not the whole accumulator take(keys.length) // Complete the observable chain after processing all the keys ).subscribe(val => { console.log(val); }); function processResponse(response, keys) { // This is where your expensive processing takes place const result = `Result for ${response}`; const [, ...remainingKeys] = keys; return ({ result, keys: remainingKeys }); } function getUsers(key) { return of(`Response of ${key}`).pipe( delay(Math.random() * 800) // Simulate a call that takes some time ); }
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.0/rxjs.umd.min.js" integrity="sha512-v0/YVjBcbjLN6scjmmJN+h86koeB7JhY4/2YeyA5l+rTdtKLv0VbDBNJ32rxJpsaW1QGMd1Z16lsLOSGI38Rbg==" crossorigin="anonymous" referrerpolicy="no-referrer"></script>

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.