1

I was wondering whether there is a way to take an observable stream and use the *While operators, particularly TakeWhile, SkipWhile and BufferWhile, so that subscribers to them do not receive an .OnComplete when the bool 'while' condition is fullfilled?

When I started using the .TakeWhile / SkipWhile and BufferWhile operators I assumed that they wouldn't terminate / .OnComplete() but merely (not) emit while the bool condition is met.

It might make more sense with an example:

I have a bool flag that indicates i.e. whether an instance is busy or not and an Observable stream of data:

private bool IsBusy { get;set; } private bool IgnoreChanges { get;set; } private IObservable<int> Producer { get;set; } private IDisposable ConsumerSubscription { get;set; } 

.. and use / setup the RX stream(s) like that (simplified)

private void SetupRx() { ConsumerSubscription = Producer .SkipWhile(_ => IgnoreChanges == true) // Drop the producer's stream of ints whenever the IgnoreChanges flag is set to true, but forward them whenever the IgnoreChanges flag is set to false .BufferWhile(_ => IsBusy == true) // for all streamed instances buffer them as long as we are busy handling the previous one(s) .Subscribe(i => DoSomething(i)); } private void DoSomething(int i) { try { IsBusy = true; // ... do something } finally { IsBusy = false; } } 

The .SkipeWhile/.BufferWhile should not complete / OnComplete(..) whenever the IsBusy/IgnoreChanges flags switch from true to false and back but keep the stream alive.

Is that somehow doable with RX.Net out of the box and/or does someone know how to accomplish this?

1
  • Just to clarify, there is no BufferWhile in RX.net (my mistake) Commented Nov 10, 2015 at 22:25

1 Answer 1

5

To drop the OnCompleted message from an IObservable<T> source, simply Concat with Observable.Never<T>():

source.TakeWhile(condition).Concat(Observable.Never<T>()) 

To manually subscribe to an IObservable<T> source such that the subscription is ended only when you manually unsubscribe, you can use Publish and IConnectableObservable<T>:

var connectableSource = source.Publish(); // To subscribe to the source: var subscription = connectableSource.Connect(); ... // To unsubscribe from the source: subscription.Dispose(); 

All of that being said, I think you are approaching this incorrectly. If it is done correctly, you won't need the above tricks. Look at your query:

ConsumerSubscription = Producer // Drop the producer's stream of ints whenever the IgnoreChanges flag // is set to true, but forward them whenever the IgnoreChanges flag is set to false .SkipWhile(_ => IgnoreChanges == true) // For all streamed instances buffer them as long as we are busy // handling the previous one(s) .BufferWhile(_ => IsBusy == true) .Subscribe(i => DoSomething(i)); 

You should be using .Where(_ => !IgnoreChanges) instead of .SkipWhile(_ => IgnoreChanges).

You should be using .Buffer(_ => IsBusy.SkipWhile(busy => busy)) with a BehaviorSubject<bool> IsBusy instead of .BufferWhile(_ => IsBusy).

The complete code would look like this:

private BehaviorSubject<bool> IsBusy { get;set; } private bool IgnoreChanges { get;set; } private IObservable<int> Producer { get;set; } private IDisposable ConsumerSubscription { get;set; } private void SetupRx() { ConsumerSubscription = Producer .Where(_ => !IgnoreChanges) .Buffer(_ => IsBusy.SkipWhile(busy => busy)) .Subscribe(buffer => DoSomething(buffer)); } private void DoSomething(IList<int> buffer) { try { IsBusy.OnNext(true); // Do something } finally { IsBusy.OnNext(false); } } 

The next improvement would be to try to get rid of the BehaviorSubject<bool> IsBusy. Subjects are something you want to try to avoid because they are state you have to manage.

Sign up to request clarification or add additional context in comments.

1 Comment

Thanks Timothy for the hint regarding simplifying my thoughts a little! I noticed that there isn't a BufferWhile in RX at all so I fiddled a little and wrote three Extension methods for my three use-cases: github.com/jbattermann/JB.Common/blob/master/JB.Common.Reactive/… ... SkipWhile/TakeWhile were actually quite easy to write, not really happy with (my) .BufferWhile just yet, but oh well.. But in general - thanks for the reply and clarifications / pointers towards Publish, Where etc. Cheers!