When takeWhile meets a value that satisfies a specified criteria, it completes the observable without propagating the value. It means that the next chained operator will not receive the value and will not invoke its callback.
Suppose that in your example the first two calls to this._adminService.getOperationDetail(...) result in a non-successful status and the third call succeeds. It means that an observable returned by getOperationDetail() would produce only two info values each of which having non-successful status. And what might be also important, the next chained concatMap operator would invoke its callback per each of those non-successful values, meaning that createRecordSets() would be called twice. I suppose that you might want to avoid that.
I would suggest to use first operator instead:
getOperationDetail(operationId) { return Observable.interval(10000) .concatMap(() => this._adminService.getOperationDetail(operationId)) .first(info => info.Status.Value !== 'SUCCESSFUL'); }
This way getOperationDetail() would produce only a single "successful" value as soon as this._adminService.getOperationDetail(operationId) succeeds. The first operator emits the first value of the source observable that matches the specified condition and then completes.
And when it comes to error handling, catch or retry operators might be useful.
Update:
The unexpected behavior you have faced (getOperationDetail() keeps being called after first() completes) seems to be a bug in rxjs. As described in this issue,
every take-ish operator (one that completes earlier than its source Observable), will keep subscribing to source Observable when combined with operator that prolongs subscription (here switchMap).
Both first and takeWhile are examples of such take-ish operators and operators that "prolong" subscription are, for example, switchMap, concatMap and mergeMap. In the following example numbers will be kept logging while inner observable of concatMap is emitting values:
var takeish$ = Rx.Observable.interval(200) // will keep logging until inner observable of `concatMap` is completed .do(x => console.log(x)) .takeWhile(x => x < 2); var source = takeish$ .concatMap(x => Rx.Observable.interval(200).take(10)) .subscribe();
It looks like this can be worked around by turning an observable containing such a take-ish operator into a higher-order observable — in a similar way as you've done:
var takeish$ = Rx.Observable.interval(200) // will log only 0, 1, 2 .do(x => console.log(x)) .takeWhile(x => x < 2); var source = Rx.Observable.of(null) .switchMap(() => takeish$) .concatMap(x => Rx.Observable.interval(200).take(1)) .subscribe();
Update 2:
It seems that the bug described above still exists as of rxjs version 5.4.2. It affects, for example, whether or not the source observable of the first operator will be unsubscribed when first meets the specified condition. When first operator is immediately followed by concatMap, its source observable will not be unsubscribed and will keep emitting values until inner observable of concatMap completes. In your case it means that this._adminService.getOperationDetail() would keep being called until observable returned by createRecordSets() would have completed.
Here's your example simplified to illustrate the behavior:
function registerDomain() { return Rx.Observable.of("operation") .concatMap(() => getOperationDetail() .concatMap(() => Rx.Observable.interval(200).take(5))); } function getOperationDetail() { return Rx.Observable.interval(100) // console.log() instead of the actual service call .do(x => console.log(x)) .first(x => x === 2); } registerDomain().subscribe();
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/Rx.js"></script>
If we expand the inner observable of the first concatMap operator, we will get the following observable:
Rx.Observable.interval(100) .do(x => console.log(x)) .first(x => x === 2) .concatMap(() => Rx.Observable.interval(200).take(5));
Notice that first is immediately followed by concatMap which prevents the source observable of the first operator (i.e. interval(100).do(x => console.log(x)) from being unsubscribed. Values will keep being logged (or in your case, service calls will keep being sent) until the inner observable of concatMap (i.e. interval(200).take(5)) completes.
If we modify the example above and move the second concatMap out of the inner observable of the first concatMap, first will not be chained with it any more and will unsubscribe from the source observable as soon as the condition is satisfied, meaning that interval will stop emitting values and no more numbers will be logged (or no more service requests will be sent):
function registerDomain() { return Rx.Observable.of("operation") .concatMap(() => getOperationDetail()) .concatMap(() => Rx.Observable.interval(200).take(5)); } function getOperationDetail() { return Rx.Observable.interval(100) // console.log() instead of the actual service call .do(x => console.log(x)) .first(x => x === 2); } registerDomain().subscribe();
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/Rx.js"></script>
Inner observable in such a case can be expanded simply to:
Rx.Observable.interval(100) .do(x => console.log(x)) .first(x => x === 2)
Notice that first is not anymore followed by concatMap.
It is also worth mentioning that in both cases observable returned by registerDomain() produces exactly the same values and if we move logging from do() operator to subscribe(), the same values will be written to the console in both cases:
registerDomain.subscribe(x => console.log(x));