3

I'm new to ReactiveX/RxJs and I'm wondering if my use-case is feasible smoothly with RxJs, preferably with a combination of built-in operators. Here's what I want to achieve:

I have an Angular2 application that communicates with a REST API. Different parts of the application need to access the same information at different times. To avoid hammering the servers by firing the same request over and over, I'd like to add client-side caching. The caching should happen in a service layer, where the network calls are actually made. This service layer then just hands out Observables. The caching must be transparent to the rest of the application: it should only be aware of Observables, not the caching.

So initially, a particular piece of information from the REST API should be retrieved only once per, let's say, 60 seconds, even if there's a dozen components requesting this information from the service within those 60 seconds. Each subscriber must be given the (single) last value from the Observable upon subscription.

Currently, I managed to achieve exactly that with an approach like this:

public getInformation(): Observable<Information> { if (!this.information) { this.information = this.restService.get('/information/') .cache(1, 60000); } return this.information; } 

In this example, restService.get(...) performs the actual network call and returns an Observable, much like Angular's http Service.

The problem with this approach is refreshing the cache: While it makes sure the network call is executed exactly once, and that the cached value will no longer be pushed to new subscribers after 60 seconds, it doesn't re-execute the initial request after the cache expires. So subscriptions that occur after the 60sec cache will not be given any value from the Observable.

Would it be possible to re-execute the initial request if a new subscription happens after the cache timed out, and to re-cache the new value for 60sec again?

As a bonus: it would be even cooler if existing subscriptions (e.g. those who initiated the first network call) would get the refreshed value whose fetching had been initiated by the newer subscription, so that once the information is refreshed, it is immediately passed through the whole Observable-aware application.

2 Answers 2

2

I figured out a solution to achieve exactly what I was looking for. It might go against ReactiveX nomenclature and best practices, but technically, it does exactly what I want it to. That being said, if someone still finds a way to achieve the same with just built-in operators, I'll be happy to accept a better answer.

So basically since I need a way to re-trigger the network call upon subscription (no polling, no timer), I looked at how the ReplaySubject is implemented and even used it as my base class. I then created a callback-based class RefreshingReplaySubject (naming improvements welcome!). Here it is:

export class RefreshingReplaySubject<T> extends ReplaySubject<T> { private providerCallback: () => Observable<T>; private lastProviderTrigger: number; private windowTime; constructor(providerCallback: () => Observable<T>, windowTime?: number) { // Cache exactly 1 item forever in the ReplaySubject super(1); this.windowTime = windowTime || 60000; this.lastProviderTrigger = 0; this.providerCallback = providerCallback; } protected _subscribe(subscriber: Subscriber<T>): Subscription { // Hook into the subscribe method to trigger refreshing this._triggerProviderIfRequired(); return super._subscribe(subscriber); } protected _triggerProviderIfRequired() { let now = this._getNow(); if ((now - this.lastProviderTrigger) > this.windowTime) { // Data considered stale, provider triggering required... this.lastProviderTrigger = now; this.providerCallback().first().subscribe((t: T) => this.next(t)); } } } 

And here is the resulting usage:

public getInformation(): Observable<Information> { if (!this.information) { this.information = new RefreshingReplaySubject( () => this.restService.get('/information/'), 60000 ); } return this.information; } 
Sign up to request clarification or add additional context in comments.

Comments

0

To implement this, you will need to create your own observable with custom logic on subscribtion:

function createTimedCache(doRequest, expireTime) { let lastCallTime = 0; let lastResult = null; const result$ = new Rx.Subject(); return Rx.Observable.create(observer => { const time = Date.now(); if (time - lastCallTime < expireTime) { return (lastResult // when result already received ? result$.startWith(lastResult) // still waiting for result : result$ ).subscribe(observer); } const disposable = result$.subscribe(observer); lastCallTime = time; lastResult = null; doRequest() .do(result => { lastResult = result; }) .subscribe(v => result$.next(v), e => result$.error(e)); return disposable; }); } 

and resulting usage would be following:

this.information = createTimedCache( () => this.restService.get('/information/'), 60000 ); 

usage example: https://jsbin.com/hutikesoqa/edit?js,console

2 Comments

Thanks for your answer, Bogdan. Unfortunately, this does not produce the desired behavior: when implemented like this, no result is present at all for the first 60sec. After the first 60sec, the request is executed once and results are pushed through the subscribers. But no further requests are ever executed again. Furthermore, this produces timed polling at the service layer whereas I was looking for caching with a timed cache expiry.
Updated answer, to match desired behavior. It is quite similar to your one(noticed it after writing this), but implemented in a different way.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.