I was wondering whether there is a way to take an observable stream and use the *While operators, particularly TakeWhile, SkipWhile and BufferWhile, so that subscribers to them do not receive an .OnComplete when the bool 'while' condition is fullfilled?
When I started using the .TakeWhile / SkipWhile and BufferWhile operators I assumed that they wouldn't terminate / .OnComplete() but merely (not) emit while the bool condition is met.
It might make more sense with an example:
I have a bool flag that indicates i.e. whether an instance is busy or not and an Observable stream of data:
private bool IsBusy { get;set; } private bool IgnoreChanges { get;set; } private IObservable<int> Producer { get;set; } private IDisposable ConsumerSubscription { get;set; } .. and use / setup the RX stream(s) like that (simplified)
private void SetupRx() { ConsumerSubscription = Producer .SkipWhile(_ => IgnoreChanges == true) // Drop the producer's stream of ints whenever the IgnoreChanges flag is set to true, but forward them whenever the IgnoreChanges flag is set to false .BufferWhile(_ => IsBusy == true) // for all streamed instances buffer them as long as we are busy handling the previous one(s) .Subscribe(i => DoSomething(i)); } private void DoSomething(int i) { try { IsBusy = true; // ... do something } finally { IsBusy = false; } } The .SkipeWhile/.BufferWhile should not complete / OnComplete(..) whenever the IsBusy/IgnoreChanges flags switch from true to false and back but keep the stream alive.
Is that somehow doable with RX.Net out of the box and/or does someone know how to accomplish this?