Not sure about the title of my question but hopefully I can explain what I am trying to do.
I want to have a Timer to inject a value into a sequence but when a specific value is observed. I want the Timer to be cancelled when any other value is entered into the sequence.
public enum State { Connected, Disconnected, DisconnectedRetryTimeout } var stateSubject = new Subject<State>(); var connectionStream = stateSubject.AsObservable(); var disconnectTimer = Observable.Return(State.DisconnectedRetryTimeout) .Delay(TimeSpan.FromSeconds(30)) .Concat(Observable.Never<State>()); var disconnectSignal = disconnectedTimer .TakeUntil(connectionStream.Where(s => s == State.Connected)) .Repeat(); var statusObservable = Observable.Merge(connectionStream, disconnectSignal) .DistinctUntilChanged(); So when nothing is in the stream (i.e. new) no timer. When Connected|DisconnectedRetryTimeout is add no timer. When Disconnected is add I want the timer to start If Connected is on the stream before the timer fires I want the timer cancelled The Timer should only fire once, until Disconnected is received again.
Pretty new to RX and ran out of ideas on this one.
Any help much appreciated.