0

I am trying to debug if I am correctly disposing the observers to a long running services (services exposing IObservable). And I am wondering if there an operator or something that we can create to log the number of active observers, say something like.

public class NewsService { IObservable<Article> GetArticles(); } NewsService.Instance .GetArticles() .DoCount(x=> Trace.Writeline("The current count is {x}")) .Subscribe(); 

There is a solution proposed here, which works on the Subject. What if we don't have access to the Subject and a library is exposing IObservable.

1 Answer 1

4

Generally speaking, the concept of subscriber count for an arbitrary observable sequence is not defined.

For cold observables such as Observable.Interval, every time you subscribe to the observable, a new pipeline instance is created, which - from its point of view sees only a single observer at a time.

We can, nonetheless, warm up a cold observable, and watch subscriptions come and go.

 public static IObservable<T> RefCount<T>(this IObservable<T> source, Action<int> onChange) { var subscribers = 0; var shared = source.Publish().RefCount(); void callback(int count) => onChange(Interlocked.Add(ref subscribers, count)); return Observable.Create<T>(observer => { callback(+1); var subscription = shared.Subscribe(observer); var dispose = Disposable.Create(() => callback(-1)); return new CompositeDisposable(subscription, dispose); }); } 

Demo

 var values = Observable .Interval(TimeSpan.FromSeconds(0.1)) .RefCount(count => Console.WriteLine($"Subscribers: {count}")); values.Take(5).Subscribe(); values.Take(10).Subscribe(); values.Take(15).Subscribe(); 

Output

Subscribers: 1 Subscribers: 2 Subscribers: 3 Subscribers: 2 Subscribers: 1 Subscribers: 0 

Now, this works because we have a shared view of the parent observable. So try to make all subscriptions point to the same instance.

_articles = GetArticles().RefCount(count => Console.WriteLine($"Subscribers: {count}"))); ... _articles.Subscribe(); 
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.