2

[Note: I am using 3.1 if that matters. Also, I've asked this on codereview but no responses so far.]

I need an operator to allow a stream of booleans to act as a gate for another stream (let values pass when the gate stream is true, drop them when it's false). I would normally use Switch for this, but if the source stream is cold it will keep recreating it, which I don't want.

I also want to clean up after myself, so that the result completes if either of the source or the gate complete.

public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate) { var s = source.Publish().RefCount(); var g = gate.Publish().RefCount(); var sourceCompleted = s.TakeLast(1).DefaultIfEmpty().Select(_ => Unit.Default); var gateCompleted = g.TakeLast(1).DefaultIfEmpty().Select(_ => Unit.Default); var anyCompleted = Observable.Amb(sourceCompleted, gateCompleted); var flag = false; g.TakeUntil(anyCompleted).Subscribe(value => flag = value); return s.Where(_ => flag).TakeUntil(anyCompleted); } 

Besides the overall verbosity, I dislike that I subscribe to the gate even if the result is never subscribed to (in which case this operator should be a no-op). Is there a way to get rid of that subscribe?

I have also tried this implementation, but it's even worse when it comes to cleaning up after itself:

return Observable.Create<T>( o => { var flag = false; gate.Subscribe(value => flag = value); return source.Subscribe( value => { if (flag) o.OnNext(value); }); }); 

These are the tests I'm using to check the implementation:

[TestMethod] public void TestMethod1() { var output = new List<int>(); var source = new Subject<int>(); var gate = new Subject<bool>(); var result = source.When(gate); result.Subscribe(output.Add, () => output.Add(-1)); // the gate starts with false, so the source events are ignored source.OnNext(1); source.OnNext(2); source.OnNext(3); CollectionAssert.AreEqual(new int[0], output); // setting the gate to true will let the source events pass gate.OnNext(true); source.OnNext(4); CollectionAssert.AreEqual(new[] { 4 }, output); source.OnNext(5); CollectionAssert.AreEqual(new[] { 4, 5 }, output); // setting the gate to false stops source events from propagating again gate.OnNext(false); source.OnNext(6); source.OnNext(7); CollectionAssert.AreEqual(new[] { 4, 5 }, output); // completing the source also completes the result source.OnCompleted(); CollectionAssert.AreEqual(new[] { 4, 5, -1 }, output); } [TestMethod] public void TestMethod2() { // completing the gate also completes the result var output = new List<int>(); var source = new Subject<int>(); var gate = new Subject<bool>(); var result = source.When(gate); result.Subscribe(output.Add, () => output.Add(-1)); gate.OnCompleted(); CollectionAssert.AreEqual(new[] { -1 }, output); } 

3 Answers 3

2

Update: This terminates when gate terminates as well. I missed TestMethod2 in the copy/paste:

 return gate.Publish(_gate => source .WithLatestFrom(_gate.StartWith(false), (value, b) => (value, b)) .Where(t => t.b) .Select(t => t.value) .TakeUntil(_gate.IgnoreElements().Materialize() )); 

This passes your tests TestMethod1, it doesn't terminate when the gate observable does.

public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate) { return source .WithLatestFrom(gate.StartWith(false), (value, b) => (value, b)) .Where(t => t.b) .Select(t => t.value); } 
Sign up to request clarification or add additional context in comments.

3 Comments

Thank you; this is very elegant (which I love) but it actually doesn't pass the second test (the result doesn't complete when the gate does). Are you seeing it pass?
Sorry, I didn't see TestMethod2. Modified answer.
WithLatestFrom is already part of some versions of System.Reactive
2

This works:

public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate) { return source.Publish(ss => gate.Publish(gs => gs .Select(g => g ? ss : ss.IgnoreElements()) .Switch() .TakeUntil(Observable.Amb( ss.Select(s => true).Materialize().LastAsync(), gs.Materialize().LastAsync())))); } 

This passes both tests.

5 Comments

Another elegant response. How do you guys know all this stuff, I think I've been through two dozen Rx courses and there's a lot I still don't know :)
If you change the lambda inside the Publish to this, it works: var gg = gate.Publish().RefCount(); var bothCompleted = Observable.Amb(ss.WhenCompleted(), gg.WhenCompleted()); return gate.Select(g => g ? ss : ss.IgnoreElements()).Switch().TakeUntil(bothCompleted); where WhenCompleted is just .Select(_ => Unit.Default).IgnoreElements().Concat(Observable.Return(Unit.Default))
@MarcelPopescu - Beware the .Publish().RefCount() - it creates fragile, run once, observables. They can easily make an observable that appears to run fine once fail the second time. It's generally better to encapsulate within a .Publish(inner => { }).
@MarcelPopescu - I took a variation of your query and got it working.
Thank you for the warning; I did have problems with .Publish().RefCount() before. I'll try to avoid it.
1

You were on the right track with Observable.Create. You should call the onError and onCompleted from both subscriptions on the observable to properly complete or error it when needed. Also by returning both the IDisposables within the Create delegate you make sure both subscriptions are properly cleaned up if you intend to dispose the When subscription before either source or gate completes.

 public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate) { return Observable.Create<T>( o => { var flag = false; var gs = gate.Subscribe( value => flag = value, e => o.OnError(e), () => o.OnCompleted()); var ss = source.Subscribe( value => { if (flag) o.OnNext(value); }, e => o.OnError(e), () => o.OnCompleted()); return new CompositeDisposable(gs, ss); }); } 

A shorter, but much harder to read version using only Rx operators. For cold observables it probably needs a publish/refcount for the source.

 public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate) { return gate .Select(g => g ? source : source.IgnoreElements()) .Switch() .TakeUntil(source.Materialize() .Where(s => s.Kind == NotificationKind.OnCompleted)); } 

1 Comment

Thank you; this is perfect.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.