35

I'm using an Observable to provide event subscription interface for clients from a global resource, and I need to manage that resource according to the number of active subscriptions:

  • Allocate global resource when the number of subscriptions becomes greater than 0
  • Release global resource when the number of subscriptions becomes 0
  • Adjust the resource usage strategy based on the number of subscriptions

What is the proper way in RXJS to monitor the number of active subscriptions?


How to implement the following within RXJS syntax? -

const myEvent: Observable<any> = new Observable(); myEvent.onSubscription((newCount: number, prevCount: number) => { if(newCount === 0) { // release global resource } else { // allocate global resource, if not yet allocated } // for a scalable resource usage / load, // re-configure it, based on newCount }); 

I wouldn't expect a guaranteed notification on each change, hence newCount + prevCount params.

UPDATE-1

This is not a duplicate to this, because I need to be notified when the number of subscriptions changes, and not just to get the counter at some point.

UPDATE-2

Without any answer so far, I quickly came up with a very ugly and limited work-around, through complete incapsulation, and specifically for type Subject. Hoping very much to find a proper solution.

UPDATE-3

After a few answers, I'm still not sure how to implement what I'm trying, which is the following:

class CustomType { } class CountedObservable<T> extends Observable<T> { private message: string; // random property public onCount; // magical Observable that needs to be implemented constructor(message: string) { // super(); ??? this.message = message; } // random method public getMessage() { return this.message; } } const a = new CountedObservable<CustomType>('hello'); // can create directly const msg = a.getMessage(); // can call methods a.subscribe((data: CustomType) => { // handle subscriptions here; }); // need that magic onCount implemented, so I can do this: a.onCount.subscribe((newCount: number, prevCont: number) => { // manage some external resources }); 

How to implement such class CountedObservable above, which would let me subscribe to itself, as well as its onCount property to monitor the number of its clients/subscriptions?

UPDATE-4

All suggested solutions seemed overly complex, and even though I accepted one of the answers, I ended up with a completely custom solution one of my own.

6
  • Possible duplicate of Rxjs number of observable subscriptions Commented May 18, 2019 at 5:43
  • @Praveenkumar That question is not a duplicate, it is asking how to get a count at some fixed point, while I'm looking for a notification of when the number of subscription changes. Commented May 18, 2019 at 5:45
  • Hey, i tried to mark and that comment got posted automatically. Since your question has this line according to the number of active subscriptions, i thought it could be a possible duplicate. Commented May 18, 2019 at 5:48
  • 2
    Look at managing the global resource via the using observable creator and then use the share operator to perform the reference counting. Commented May 18, 2019 at 7:25
  • @cartant I'm not an expert in RXJS to understand what you are suggesting. Could you publish an answer with the example, please? Commented May 18, 2019 at 7:28

4 Answers 4

17
+500

You could achieve it using defer to track subscriptions and finalize to track completions, e.g. as an operator:

// a custom operator that will count number of subscribers function customOperator(onCountUpdate = noop) { return function refCountOperatorFunction(source$) { let counter = 0; return defer(()=>{ counter++; onCountUpdate(counter); return source$; }) .pipe( finalize(()=>{ counter--; onCountUpdate(counter); }) ); }; } // just a stub for `onCountUpdate` function noop(){} 

And then use it like:

const source$ = new Subject(); const result$ = source$.pipe( customOperator( n => console.log('Count updated: ', n) ) ); 

Heres a code snippet illustrating this:

const { Subject, of, timer, pipe, defer } = rxjs; const { finalize, takeUntil } = rxjs.operators; const source$ = new Subject(); const result$ = source$.pipe( customOperator( n => console.log('Count updated: ', n) ) ); // emit events setTimeout(()=>{ source$.next('one'); }, 250); setTimeout(()=>{ source$.next('two'); }, 1000); setTimeout(()=>{ source$.next('three'); }, 1250); setTimeout(()=>{ source$.next('four'); }, 1750); // subscribe and unsubscribe const subscriptionA = result$ .subscribe(value => console.log('A', value)); setTimeout(()=>{ result$.subscribe(value => console.log('B', value)); }, 500); setTimeout(()=>{ result$.subscribe(value => console.log('C', value)); }, 1000); setTimeout(()=>{ subscriptionA.unsubscribe(); }, 1500); // complete source setTimeout(()=>{ source$.complete(); }, 2000); function customOperator(onCountUpdate = noop) { return function refCountOperatorFunction(source$) { let counter = 0; return defer(()=>{ counter++; onCountUpdate(counter); return source$; }) .pipe( finalize(()=>{ counter--; onCountUpdate(counter); }) ); }; } function noop(){}
<script src="https://unpkg.com/[email protected]/bundles/rxjs.umd.min.js"></script>

* NOTE: if your source$ is cold — you might need to share it.

Hope it helps

Sign up to request clarification or add additional context in comments.

6 Comments

Do you think it is possible to encapsulate all this within an observable via a template class, so we can just subscribe to the counter change, like I did here? i.e. To be able to extend any observable with onCount event as an observable that allows easy way of subscribing to the usage count changes. P.S. I'm trying to figure out how to take all this and use in a way that's generic/reusable.
@vitaly-t, I thought of it at first, though I'm not experienced with overriding rxjs classes, so implemented it via function/operators. Heres my failed attempt to achieve it via a class thinkrx.io/gist/4f281f6ff5b4b52b517196616f3f7dc8 (it leaks to -1, so use it only as a starting ground). Also, I would suggest you looking into source code of the refCount operator — it has similar behavior to your request, so you might use it as a base. Sorry, can't be of better help atm.
@vitaly-t, as a possible alternative to a custom class, heres a code from my example, used as an operator: thinkrx.io/gist/bf7d5c8fcae8f2c5f0129fd4a11b9147
@vitaly-t, uh-oh. Thank you. In my opinion this question deserved attention even w/o the reward. Also, I humbly think that paulpdaniels' and dmcgrandle gave great answers and probably deserve the reward more.
To those reading this later: definitely check out detailed overview paulpdaniels gave on using+share approach and dmcgrandle's take showing us a nice solution via custom class extended from Observable. And surely see what vitaly-t ended up with in his answer here, which might be easier to maintain by a dev team, as its more customizable and transparent.
|
8

You are really asking three separate questions here, and I question whether you really need the full capability that you mention. Since most of the resource managment stuff you are asking for is already provided for by the library, doing custom tracking code seems to be redundant. The first two questions:

  • Allocate global resource when the number of subscriptions becomes greater than 0
  • Release global resource when the number of subscriptions becomes 0

Can be done with the using + share operators:

class ExpensiveResource { constructor () { // Do construction } unsubscribe () { // Do Tear down } } // Creates a resource and ties its lifecycle with that of the created `Observable` // generated by the second factory function // Using will accept anything that is "Subscription-like" meaning it has a unsubscribe function. const sharedStream$ = using( // Creates an expensive resource () => new ExpensiveResource(), // Passes that expensive resource to an Observable factory function er => timer(1000) ) // Share the underlying source so that global creation and deletion are only // processed when the subscriber count changes between 0 and 1 (or visa versa) .pipe(share()) 

After that sharedStream$ can be passed around as a base stream which will manage the underlying resource (assuming you implemented your unsubscribe correctly) so that the resource will be created and torn down as the number of subscribers transitions between 0 and 1.

  • Adjust the resource usage strategy based on the number of subscriptions

    The third question I am most dubious on, but I'll answer it for completeness assuming you know your application better than I do (since I can't think of a reason why you would need specific handling at different usage levels other than going between 0 and 1).

Basically I would use a similar approach as above but I would encapuslate the transition logic slightly differently.

// Same as above class ExpensiveResource { unsubscribe() { console.log('Tear down this resource!')} } const usingReferenceTracking = (onUp, onDown) => (resourceFactory, streamFactory) => { let instance, refCount = 0 // Again manage the global resource state with using const r$ = using( // Unfortunately the using pattern doesn't let the resource escape the closure // so we need to cache it for ourselves to use later () => instance || (instance = resourceFactory()), // Forward stream creation as normal streamFactory ) ).pipe( // Don't forget to clean up the stream after all is said and done // Because its behind a share this should only happen when all subscribers unsubscribe finalize(() => instance = null) share() ) // Use defer to trigger "onSubscribe" side-effects // Note as well that these side-effects could be merged with the above for improved performance // But I prefer them separate for easier maintenance. return defer(() => onUp(instance, refCount += 1) || r$) // Use finalize to handle the "onFinish" side-effects .pipe(finalize(() => onDown(instance, refCount -= 1))) } const referenceTracked$ = usingReferenceTracking( (ref, count) => console.log('Ref count increased to ' + count), (ref, count) => console.log('Ref count decreased to ' + count) )( () => new ExpensiveResource(), ref => timer(1000) ) referenceTracked$.take(1).subscribe(x => console.log('Sub1 ' +x)) referenceTracked$.take(1).subscribe(x => console.log('Sub2 ' +x)) // Ref count increased to 1 // Ref count increased to 2 // Sub1 0 // Ref count decreased to 1 // Sub2 0 // Ref count decreased to 0 // Tear down this resource! 

Warning: One side effect of this is that by definition the stream will be warm once it leaves the usingReferenceTracking function, and it will go hot on first subscription. Make sure you take this into account during the subscription phase.

3 Comments

There is nothing dubious in my question. I have an IO that needs to use its cache differently when it has 1, 10 or 100 clients subscribed to it, by tweaking certain optimization parameters. This is how things scale, often-wise. That's why I need to monitor the number of subscribers.
I have updated my question with an example of what it is i'm trying to implement. Is it possible to implement it that way? If yes, please show me how... You seem to have most knowledge about RXJS from other answers :)
@vitaly-t reading the final comments, I do apologize if I came across as condescending, that wasn’t my intention. I have a fairly opinionated view on RxJS and I try to make all of my caveats, conditions and reservations clear to make sure people have adequately questioned the approach (from the libraries perspective). Many newcomers don’t know what they don’t know and will try to make library solve a problem it wasn’t meant to solve. Any suggested edits to improve on the tone of the answer would be appreciated, since I’d like it to be constructive answer regardless :)
5

What a fun problem! If I am understanding what you are asking, here is my solution to this: create a wrapper class around Observable that tracks the subscriptions by intercepting both subscribe() and unsubscribe(). Here is the wrapper class:

export class CountSubsObservable<T> extends Observable<T>{ private _subCount = 0; private _subCount$: BehaviorSubject<number> = new BehaviorSubject(0); public subCount$ = this._subCount$.asObservable(); constructor(public source: Observable<T>) { super(); } subscribe( observerOrNext?: PartialObserver<T> | ((value: T) => void), error?: (error: any) => void, complete?: () => void ): Subscription { this._subCount++; this._subCount$.next(this._subCount); let subscription = super.subscribe(observerOrNext as any, error, complete); const newUnsub: () => void = () => { if (this._subCount > 0) { this._subCount--; this._subCount$.next(this._subCount); subscription.unsubscribe(); } } subscription.unsubscribe = newUnsub; return subscription; } } 

This wrapper creates a secondary observable .subCount$ that can be subscribed to which will emit every time the number of subscriptions to the source observable changes. It will emit a number corresponding to the current number of subscribers.

To use it you would create a source observable and then call new with this class to create the wrapper. For example:

const source$ = interval(1000).pipe(take(10)); const myEvent$: CountSubsObservable<number> = new CountSubsObservable(source$); myEvent$.subCount$.subscribe(numSubs => { console.log('subCount$ notification! Number of subscriptions is now', numSubs); if(numSubs === 0) { // release global resource } else { // allocate global resource, if not yet allocated } // for a scalable resource usage / load, // re-configure it, based on numSubs }); source$.subscribe(result => console.log('result is ', result)); 

To see it in use, check out this Stackblitz.

UPDATE:

Ok, as mentioned in the comments, I'm struggling a little to understand where the stream of data is coming from. Looking back through your question, I see you are providing an "event subscription interface". If the stream of data is a stream of CustomType as you detail in your third update above, then you may want to use fromEvent() from rxjs to create the source observable with which you would call the wrapper class I provided.

To show this I created a new Stackblitz. From that Stackblitz here is the stream of CustomTypes and how I would use the CountedObservable class to achieve what you are looking for.

class CustomType { a: string; } const dataArray = [ { a: 'January' }, { a: 'February' }, { a: 'March' }, { a: 'April' }, { a: 'May' }, { a: 'June' }, { a: 'July' }, { a: 'August' }, { a: 'September' }, { a: 'October' }, { a: 'November' }, { a: 'December' } ] as CustomType[]; // Set up an arbitrary source that sends a stream of `CustomTypes`, one // every two seconds by using `interval` and mapping the numbers into // the associated dataArray. const source$ = interval(2000).pipe( map(i => dataArray[i]), // transform the Observable stream into CustomTypes take(dataArray.length), // limit the Observable to only emit # array elements share() // turn into a hot Observable. ); const myEvent$: CountedObservable<CustomType> = new CountedObservable(source$); myEvent$.onCount.subscribe(newCount => { console.log('newCount notification! Number of subscriptions is now', newCount); }); 

I hope this helps.

6 Comments

So if I have my own class that represents the resource that needs to be controlled based on the number of subscriptions, how does it fit into it? I tried passing in my class instead of number, but that did nothing. Also, super.subscribe(observerOrNext as any, error, complete); comes up as deprecated.
Do you think it is possible to implement class CountedObservable like I showed in the last update?
An observable is usually of a stream of data - in your CustomType above, is there a stream of data? Or are you wanting to create a stream of messages (ie: a stream of strings)? and .getMessage() returns the last message in the stream? Sorry - it is not clear to me yet where the source observable is that you want to count subscribers on.
The example shows it clearly that it is the object itself, i.e. type CountedObservable. You can see I am showing the subscriptions in the example. The nature of class CustomType is here irrelevant, as well as method getMessage.
Ok, I updated the example in my answer to include the CountedObservable itself being a stream of CustomType which is then subscribed to by Observers and counted by the wrapper class.
|
3

First of all, I very much appreciate how much time and effort people have committed trying to answer my question! And I am sure those answers will prove to be a useful guideline to other developers, solving similar scenarios with RXJS.

However, specifically for what I was trying to get out of RXJS, I found in the end that I am better off not using it at all. I specifically wanted the following:

A generic, easy-to-use interface for subscribing to notifications, plus monitoring subscriptions - all in one. With RXJS, the best I would end up is some workarounds that appear to be needlessly convoluted or even cryptic to developers who are not experts in RXJS. That is not what I would consider a friendly interface, more like something that rings over-engineering.

I ended up with a custom, much simpler interface that can do everything I was looking for:

export class Subscription { private unsub: () => void; constructor(unsub: () => void) { this.unsub = unsub; } public unsubscribe(): void { if (this.unsub) { this.unsub(); this.unsub = null; // to prevent repeated calls } } } export class Observable<T = any> { protected subs: ((data: T) => void)[] = []; public subscribe(cb: (data: T) => void): Subscription { this.subs.push(cb); return new Subscription(this.createUnsub(cb)); } public next(data: T): void { // we iterate through a safe clone, in case an un-subscribe occurs; // and since Node.js is the target, we are using process.nextTick: [...this.subs].forEach(cb => process.nextTick(() => cb(data))); } protected createUnsub(cb) { return () => { this.subs.splice(this.subs.indexOf(cb), 1); }; } } export interface ISubCounts { newCount: number; prevCount: number; } export class CountedObservable<T = any> extends Observable<T> { readonly onCount: Observable<ISubCounts> = new Observable(); protected createUnsub(cb) { const s = this.subs; this.onCount.next({newCount: s.length, prevCount: s.length - 1}); return () => { s.splice(s.indexOf(cb), 1); this.onCount.next({newCount: s.length, prevCount: s.length + 1}); }; } } 

It is both small and elegant, and lets me do everything I needed to begin with, in a safe and friendly manner. I can do the same subscribe and onCount.subscribe, and get all the same notifications:

const a = new CountedObservable<string>(); const countSub = a.onCount.subscribe(({newCount, prevCount}) => { console.log('COUNTS:', newCount, prevCount); }); const sub1 = a.subscribe(data => { console.log('SUB-1:', data); }); const sub2 = a.subscribe(data => { console.log('SUB-2:', data); }); a.next('hello'); sub1.unsubscribe(); sub2.unsubscribe(); countSub.unsubscribe(); 

I hope this will help somebody else also.

P.S. I further improved it as an independent module.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.