2

The following observable sequence adds each element to a ReplaySubject so that I can access any elements later and even await the completion of the ReplaySubject. It completes the ReaplySubject after a timespan has been reached.

ReplaySubject<string> rfidPlayer = new ReplaySubject<string>(); characteristic.WhenNotificationReceived() .TakeUntil(Observable.Timer(TimeSpan.FromSeconds(1))) .Subscribe( onNext: result => { string nextTag = BitConverter.ToString(result.Data); nextTag = nextTag.Replace("-", ""); rfidPlayer.OnNext(nextTag); }, onCompleted: () => { rfidPlayer.OnCompleted(); }); 

I would like the sequence to run until it has been a given time since the last "OnNext" call and then complete. This will be very useful in a variety of bluetooth communication scenarios where the bluetooth device will give me a sequence of data and then just stop without any kind of completion message or event. In these scenarios I need to heuristically determine when the sequence is done and then complete it myself. So, if it's been "too long" since the last bluetooth notification I would like to complete the ReplaySubject.

I could do this by creating a timer, resetting it when each element is received, and then completing the ReplaySubject when the timer reaches "too long", but I've heard that creating an object and manipulating it from within an observable subscription is not thread safe.

Any suggestion on how I can complete a sequence after a "too long" interval?

Here is a version that is not thread safe from what I've heard but should work as intended:

bool reading = true; System.Timers.Timer timer = new System.Timers.Timer(1000); timer.Elapsed += (sender, e) => { reading = false; }; ReplaySubject<string> rfidPlayer = new ReplaySubject<string>(); characteristic.WhenNotificationReceived() .TakeWhile(x => reading) .Subscribe( onNext: result => { string nextTag = BitConverter.ToString(result.Data); nextTag = nextTag.Replace("-", ""); timer.Stop(); timer.Start(); rfidPlayer.OnNext(nextTag); }, onCompleted: () => { rfidPlayer.OnCompleted(); }); 

this seems satisfactory based on Simonare's first answer:

 characteristic.WhenNotificationReceived() .Timeout(TimeSpan.FromSeconds(1)) .Subscribe( onNext: result => { string nextTag = BitConverter.ToString(result.Data); nextTag = nextTag.Replace("-", ""); rfidPlayer.OnNext(nextTag); }, onError: error => { rfidPlayer.OnCompleted(); }); 
5
  • Please don't use external state (i.e.bool reading = true;) and don't use a timer. You're working with an extremely powerful tool that doesn't need any of that. When you use Rx properly it works superbly. Commented Feb 2, 2019 at 1:10
  • Also try not to do work in your .Subscribe. Try this instead: characteristic.WhenNotificationReceived().TakeUntil(Observable.Timer(TimeSpan.FromSeconds(1))).Select(result => BitConverter.ToString(result.Data).Replace("-", "")).Subscribe(rfidPlayer);. Commented Feb 2, 2019 at 1:12
  • Does TakeUntil set an overall timer or an interval timer? Commented Feb 4, 2019 at 16:53
  • @Enigmativity And thank you for your comments, my Rx has a long way to go but it's improving by leaps and bounds in large part to your feedback. Commented Feb 4, 2019 at 17:10
  • 1
    No worries. I've added an answer that I think you might get a lot out of. It uses two of the operators that I feel are a couple of the most important ones yet are often underused - Generate and Switch. Let me know if you find it useful. Commented Feb 4, 2019 at 22:37

3 Answers 3

1

you may consider to use Timeout Operator . The only downside of this is it terminates with error signal. You may need to handle false error

The Timeout operator allows you to abort an Observable with an onError termination if that Observable fails to emit any items during a specified span of time.

If you use the approach below you can surpass error

 .Timeout(200, Promise.resolve(42)); 

Another variant allows you to instruct timeout to switch to a backup Observable that you specify, rather than terminating with an error, if the timeout condition is triggered.


characteristic.WhenNotificationReceived() .Timeout(TimeSpan.FromSeconds(1)) .Subscribe( onNext: result => { .... rfidPlayer.OnNext(....); }, onError: error => { rfidPlayer.OnCompleted(); }); 
Sign up to request clarification or add additional context in comments.

8 Comments

the timeout operator would work the same as the TakeUntil operator except it would throw an exception instead of completing the sequence.
you can stick with TakeUntil, I don't think you will encounter with any problem with it.
TakeUntil completes after an overall time has passed. I am looking for a solution that completes when a given interval since the last element arrives passes. I have posted sample code that behaves the way I want it to but is not thread safe.
Timeout does that
take a look at my edit above. I've marked yours as answer but it would be nice if you updated your answer to include the onError block.
|
0

I find it nasty to use Timeout because of the exceptions.

I prefer to inject a value into the sequence that I can use to terminate the sequence. If my sequence, for example, produces non-negative numbers then if I inject a -1 I know to end the sequence.

Here's an example:

Start with this observable that generates powers of 2 beginning with 1 and it also delays producing each value by the number of milliseconds of the pending value.

Observable .Generate(1, x => true, x => 2 * x, x => x, x => TimeSpan.FromMilliseconds(x)) 

So 1, 2, 4, 8, etc, getting slower and slower.

Now I want to stop this sequence if there are no values for 3.0 seconds then I can do this:

 .Select(x => Observable.Timer(TimeSpan.FromSeconds(3.0)).Select(y => -1).StartWith(x)) .Switch() .TakeWhile(x => x >= 0) 

If I run this sequence I get this output:

 1 2 4 8 16 32 64 128 256 512 1024 2048 

The sequence is about to produce 4096 but it first waits 4096 milliseconds to produce that value - in the meanwhile the Observable.Timer(TimeSpan.FromSeconds(3.0)) fires and outputs a -1 thus stopping the sequence.

The key part of this query is the use of Switch. It takes an IObservable<IObservable<T>> and produces a IObservable<T> by only subscribing to the latest outer observable and unsubscribing from the previous one.

So, in my query, each new value produced by the sequence stops and restarts the Timer.

In your case your observable would look like this:

characteristic .WhenNotificationReceived() .Select(result => BitConverter.ToString(result.Data).Replace("-", "")) .Select(x => Observable.Timer(TimeSpan.FromSeconds(1.0)).Select(y => (string)null).StartWith(x)) .Switch() .TakeWhile(x => x != null) .Subscribe(rfidPlayer); 

Comments

0

Here is a custom operator TakeUntilTimeout you could use, which is a thin layer on top of the built-in Timeout operator.

/// <summary> /// Applies a timeout policy for each element in the observable sequence. /// If the next element isn't received within the specified timeout duration /// starting from its predecessor, the sequence terminates. /// </summary> public static IObservable<T> TakeUntilTimeout<T>( this IObservable<T> source, TimeSpan timeout) { return source.Timeout(timeout, Observable.Empty<T>()); } 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.