The following observable sequence adds each element to a ReplaySubject so that I can access any elements later and even await the completion of the ReplaySubject. It completes the ReaplySubject after a timespan has been reached.
ReplaySubject<string> rfidPlayer = new ReplaySubject<string>(); characteristic.WhenNotificationReceived() .TakeUntil(Observable.Timer(TimeSpan.FromSeconds(1))) .Subscribe( onNext: result => { string nextTag = BitConverter.ToString(result.Data); nextTag = nextTag.Replace("-", ""); rfidPlayer.OnNext(nextTag); }, onCompleted: () => { rfidPlayer.OnCompleted(); }); I would like the sequence to run until it has been a given time since the last "OnNext" call and then complete. This will be very useful in a variety of bluetooth communication scenarios where the bluetooth device will give me a sequence of data and then just stop without any kind of completion message or event. In these scenarios I need to heuristically determine when the sequence is done and then complete it myself. So, if it's been "too long" since the last bluetooth notification I would like to complete the ReplaySubject.
I could do this by creating a timer, resetting it when each element is received, and then completing the ReplaySubject when the timer reaches "too long", but I've heard that creating an object and manipulating it from within an observable subscription is not thread safe.
Any suggestion on how I can complete a sequence after a "too long" interval?
Here is a version that is not thread safe from what I've heard but should work as intended:
bool reading = true; System.Timers.Timer timer = new System.Timers.Timer(1000); timer.Elapsed += (sender, e) => { reading = false; }; ReplaySubject<string> rfidPlayer = new ReplaySubject<string>(); characteristic.WhenNotificationReceived() .TakeWhile(x => reading) .Subscribe( onNext: result => { string nextTag = BitConverter.ToString(result.Data); nextTag = nextTag.Replace("-", ""); timer.Stop(); timer.Start(); rfidPlayer.OnNext(nextTag); }, onCompleted: () => { rfidPlayer.OnCompleted(); }); this seems satisfactory based on Simonare's first answer:
characteristic.WhenNotificationReceived() .Timeout(TimeSpan.FromSeconds(1)) .Subscribe( onNext: result => { string nextTag = BitConverter.ToString(result.Data); nextTag = nextTag.Replace("-", ""); rfidPlayer.OnNext(nextTag); }, onError: error => { rfidPlayer.OnCompleted(); });
bool reading = true;) and don't use a timer. You're working with an extremely powerful tool that doesn't need any of that. When you use Rx properly it works superbly..Subscribe. Try this instead:characteristic.WhenNotificationReceived().TakeUntil(Observable.Timer(TimeSpan.FromSeconds(1))).Select(result => BitConverter.ToString(result.Data).Replace("-", "")).Subscribe(rfidPlayer);.GenerateandSwitch. Let me know if you find it useful.