0

At regular intervals, I want to poll an API that will return an array of records which will almost certainly vary in size. I want each record to display in a CSS animation which takes t amount of time. Therefore, I must buffer the API responses and release them individually, no more frequently than t, so that the animation can complete smoothly.

After much searching and trial and error, I put together this custom RxJS operator (in framework-less TypeScript). However, the double/nested concatMap has a bit of code smell. Is there a more elegant or reactive solution? Are all the inner observables managed properly (unsubscribed from)?

(This is my first custom operator, so any other feedback is welcome.)

export function recordPace (/* params to pipe */): OperatorFunction<IRecord[], IRecord> { // inner function automatically receives source observable return (source: Observable<IRecord[]>) => { return source.pipe( // First, explode the array to show one at a time. concatMap((records: IRecord[]) => from(records).pipe( // Now, for each array value, add a delay concatMap((record: IRecord) => of(record).pipe( delay(t), )) tap((record: IRecord) => { // executes once per record, no faster than every t }), )), tap((record: IRecord) => { // alternative also executes once per record, no faster than every t }), ); }; } 
MyApi.doPolling().pipe( recordPace(), recordAnimate(), ).subscribe( () => {}, () => {}, () => { console.log('done'); } ); 

1 Answer 1

1

If you want to explode the array, you can use mergeAll().

export function recordPace (/* params to pipe */): OperatorFunction<IRecord[], IRecord> { return (source: Observable<IRecord[]>) => { return source.pipe( // concatMap((records: IRecord[]) => from(records).pipe( mergeAll(), concatMap((record: IRecord) => of(record).pipe( delay(t), )) tap((record: IRecord) => { // executes once per record, no faster than every t }), // )), tap((record: IRecord) => {}), ); }; } 

You can use any of the mergeAll, switchAll, concatAll operators. This is because if you only want to explode the array, there is no async operation involved. Each operator mentioned above takes in a function that returns an ObservableInput

export type ObservableInput<T> = Observable<T> | InteropObservable<T> | AsyncIterable<T> | PromiseLike<T> | ArrayLike<T> | Iterable<T>; 

As you can see, an array could be considered an ObservableInput and the behaviour is that if your callback function returns an array, it will send each of its items individually.

Also, congrats on your first custom operator!

Sign up to request clarification or add additional context in comments.

2 Comments

In other words, mergeAll, switchAll, concatAll are equivalent in this case because an array's synchronous "atomicity" means there's no way for elements of the nth emitted array to intermix with elements of array 1 through n-1.
@nshew13 Yes, exactly!

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.