1

Not sure about the title of my question but hopefully I can explain what I am trying to do.

I want to have a Timer to inject a value into a sequence but when a specific value is observed. I want the Timer to be cancelled when any other value is entered into the sequence.

public enum State { Connected, Disconnected, DisconnectedRetryTimeout } var stateSubject = new Subject<State>(); var connectionStream = stateSubject.AsObservable(); var disconnectTimer = Observable.Return(State.DisconnectedRetryTimeout) .Delay(TimeSpan.FromSeconds(30)) .Concat(Observable.Never<State>()); var disconnectSignal = disconnectedTimer .TakeUntil(connectionStream.Where(s => s == State.Connected)) .Repeat(); var statusObservable = Observable.Merge(connectionStream, disconnectSignal) .DistinctUntilChanged(); 

So when nothing is in the stream (i.e. new) no timer. When Connected|DisconnectedRetryTimeout is add no timer. When Disconnected is add I want the timer to start If Connected is on the stream before the timer fires I want the timer cancelled The Timer should only fire once, until Disconnected is received again.

Pretty new to RX and ran out of ideas on this one.

Any help much appreciated.

1 Answer 1

3

If I understand the problem correctly: We start with a state stream that will emit either Connected or Disconnected messages. We want to enrich this with DisconnectedRetryTimeout message that appears if a Disconnected message sits on the stream for 30 seconds without a Connected message appearing.

One way to do this has the following idea:

Project the Connected/Disconnected stream into the DisconnectedRetryTimeout stream as follows:

  • First strip duplicates with DistinctUntilChanged as we only want the first Disconnected message of a batch to start a timer.
  • If Disconnected is received, project this event as a stream that emits DisconnectedRetryTimeout after 30 seconds
  • If Connected is received, just project an infinite and empty stream (Observable.Never)
  • With the above, we end up with a stream of streams, so now we use Switch which flattens this by always taking the most recent stream. So if Connect appears whilst the timer is in flight, the Never stream will replace the timer stream.

Now we can merge this with the original de-duped stream. Note that we publish the de-duped stream because we will be subscribing to it twice - if we don't do this, and the source is cold then we can run into problems. See more about that here.

var stateSubject = new Subject<State>(); var state = stateSubject.DistinctUntilChanged().Publish().RefCount(); var disconnectTimer = state .Select(x => x == State.Disconnected ? Observable.Timer(TimeSpan.FromSeconds(30)) .Select(_ => State.DisconnectedRetryTimeout) : Observable.Never<State>()) .Switch(); var statusObservable = state.Merge(disconnectTimer); 

EDIT: Simpler Version

You can do this all in one go and drop the publish step and merge - by using StartWith we can push the Disconnected events through the timer stream, and with Observable.Return we can push the Connected through replacing the empty Never stream:

var statusObservable = stateSubject .DistinctUntilChanged() .Select(x => x == State.Disconnected ? Observable.Timer(TimeSpan.FromSeconds(5)) .Select(_ => State.DisconnectedRetryTimeout) .StartWith(State.Disconnected) : Observable.Return(State.Connected)) .Switch(); 
Sign up to request clarification or add additional context in comments.

1 Comment

@antinutrino - good stuff - I found a lighter weight solution now too and edited it in.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.