5

I have the following methods which should be called like this:

  1. registerDomain should be called and should return an operationId
  2. After 10 seconds, getOperationDetail should be called passing in the operationId
  3. getOperationDetail should be called every 10 seconds until successful is returned.
  4. Once getOperationDetail finishes, createRecordSets should be called.
  5. Finally, getChangeStatus should be called until it returns INSYNC
  6. If any of the api calls throw an exception, how can I handle the error on the client side?

The following code below calls registerDomain and getOperationDetail, but after getOperationDetail completes, it does not move onto createRecordSets.

 registerDomain(domain) { return this._adminService.registerDomain(domain) .concatMap(operation => this.getOperationDetail(operation.OperationId)) .concatMap(() => this._adminService.createRecordSets(domain)); } getOperationDetail(operationId) { return Observable.interval(10000) .mergeMap(() => this._adminService.getOperationDetail(operationId)) .takeWhile((info) => info.Status.Value !== 'SUCCESSFUL'); } createRecordSets(caseWebsiteUrl) { return this._adminService.createRecordSets(caseWebsiteUrl.Url) .concatMap(registerId => this.getChangeStatus(registerId)); } getChangeStatus(registerId) { return Observable.interval(5000) .mergeMap(() => this._adminService.getChange(registerId)) .takeWhile((info) => info.ChangeInfo.Status.Value !== 'INSYNC'); } 

I updated getOperationDetail to use the first operator:

 getOperationDetail(operationId) { return Observable.interval(3000) .mergeMap(() => this._adminService.getOperationDetail(operationId)) .first((info) => info.Status.Value === 'SUCCESSFUL') } 

Now it does in fact call createRecordSets, however, after createRecordSets, it continues to call getOperationDetail about 13 times and eventually calls getChangeStatus. The way I was looking at it, I thought it would:

  1. Call getOperationDetail until it receives a SUCCESS.
  2. Call createRecordSets one time.
  3. Call getChangeStatus until it receives an INSYNC
  4. Done.

Why the additional calls?

I changed the registerDomain to look like this:

 registerDomain(domain) { return this._adminService.registerDomain(domain) .concatMap(operation => this.getOperationDetail(operation.OperationId)) .concatMap((op) => this.createRecordSets(op)); 

Before I had the .concatMap((op) => this.createRecordSets(op)) chained right after this.getOperationDetail. Once I moved it outside that, it started working as expected. I am unsure why though. Can someone explain?

1 Answer 1

6

When takeWhile meets a value that satisfies a specified criteria, it completes the observable without propagating the value. It means that the next chained operator will not receive the value and will not invoke its callback.

Suppose that in your example the first two calls to this._adminService.getOperationDetail(...) result in a non-successful status and the third call succeeds. It means that an observable returned by getOperationDetail() would produce only two info values each of which having non-successful status. And what might be also important, the next chained concatMap operator would invoke its callback per each of those non-successful values, meaning that createRecordSets() would be called twice. I suppose that you might want to avoid that.

I would suggest to use first operator instead:

getOperationDetail(operationId) { return Observable.interval(10000) .concatMap(() => this._adminService.getOperationDetail(operationId)) .first(info => info.Status.Value !== 'SUCCESSFUL'); } 

This way getOperationDetail() would produce only a single "successful" value as soon as this._adminService.getOperationDetail(operationId) succeeds. The first operator emits the first value of the source observable that matches the specified condition and then completes.

And when it comes to error handling, catch or retry operators might be useful.

Update:

The unexpected behavior you have faced (getOperationDetail() keeps being called after first() completes) seems to be a bug in rxjs. As described in this issue,

every take-ish operator (one that completes earlier than its source Observable), will keep subscribing to source Observable when combined with operator that prolongs subscription (here switchMap).

Both first and takeWhile are examples of such take-ish operators and operators that "prolong" subscription are, for example, switchMap, concatMap and mergeMap. In the following example numbers will be kept logging while inner observable of concatMap is emitting values:

var takeish$ = Rx.Observable.interval(200) // will keep logging until inner observable of `concatMap` is completed .do(x => console.log(x)) .takeWhile(x => x < 2); var source = takeish$ .concatMap(x => Rx.Observable.interval(200).take(10)) .subscribe(); 

It looks like this can be worked around by turning an observable containing such a take-ish operator into a higher-order observable — in a similar way as you've done:

var takeish$ = Rx.Observable.interval(200) // will log only 0, 1, 2 .do(x => console.log(x)) .takeWhile(x => x < 2); var source = Rx.Observable.of(null) .switchMap(() => takeish$) .concatMap(x => Rx.Observable.interval(200).take(1)) .subscribe(); 

Update 2:

It seems that the bug described above still exists as of rxjs version 5.4.2. It affects, for example, whether or not the source observable of the first operator will be unsubscribed when first meets the specified condition. When first operator is immediately followed by concatMap, its source observable will not be unsubscribed and will keep emitting values until inner observable of concatMap completes. In your case it means that this._adminService.getOperationDetail() would keep being called until observable returned by createRecordSets() would have completed.

Here's your example simplified to illustrate the behavior:

function registerDomain() { return Rx.Observable.of("operation") .concatMap(() => getOperationDetail() .concatMap(() => Rx.Observable.interval(200).take(5))); } function getOperationDetail() { return Rx.Observable.interval(100) // console.log() instead of the actual service call .do(x => console.log(x)) .first(x => x === 2); } registerDomain().subscribe();
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/Rx.js"></script>

If we expand the inner observable of the first concatMap operator, we will get the following observable:

Rx.Observable.interval(100) .do(x => console.log(x)) .first(x => x === 2) .concatMap(() => Rx.Observable.interval(200).take(5)); 

Notice that first is immediately followed by concatMap which prevents the source observable of the first operator (i.e. interval(100).do(x => console.log(x)) from being unsubscribed. Values will keep being logged (or in your case, service calls will keep being sent) until the inner observable of concatMap (i.e. interval(200).take(5)) completes.

If we modify the example above and move the second concatMap out of the inner observable of the first concatMap, first will not be chained with it any more and will unsubscribe from the source observable as soon as the condition is satisfied, meaning that interval will stop emitting values and no more numbers will be logged (or no more service requests will be sent):

function registerDomain() { return Rx.Observable.of("operation") .concatMap(() => getOperationDetail()) .concatMap(() => Rx.Observable.interval(200).take(5)); } function getOperationDetail() { return Rx.Observable.interval(100) // console.log() instead of the actual service call .do(x => console.log(x)) .first(x => x === 2); } registerDomain().subscribe();
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/Rx.js"></script>

Inner observable in such a case can be expanded simply to:

Rx.Observable.interval(100) .do(x => console.log(x)) .first(x => x === 2) 

Notice that first is not anymore followed by concatMap.

It is also worth mentioning that in both cases observable returned by registerDomain() produces exactly the same values and if we move logging from do() operator to subscribe(), the same values will be written to the console in both cases:

registerDomain.subscribe(x => console.log(x)); 
Sign up to request clarification or add additional context in comments.

5 Comments

Hi, I tried first and it works, but I am running into an issue. I updated my post with it.
I think I managed to reproduce unexpected behavior that you described, I will update the answer soon. In a nutshell, first() doesn't seem to unsubscribe from the source observable even after the specified condition is met, see this issue for details.
@xaisoft, I updated the answer with the bug description and a possible way to work around it. Frankly, this bug was a big surprise for me : )
Hi Sergey, I appreciate the answer. I also posted a question with a simple example and got the following response. It looks like it is fixed. stackoverflow.com/a/45332224/2665434
@xaisoft, the other question you posted is absolutely valid, but it has nothing to do with the bug you have described. I am rather sure that the bug still exists in the current version of rxjs (5.4.2 as of now) and it is related to whether or not first operator is chained with concatMap operator. I updated my answer with more details and a couple of runnable snippets to illustrate the issue.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.