3

I have a requirement where after an initial delay of 10 seconds, I need to execute SomeMethod at every 10 minutes, but with a catch that 10 minute timer should start after the completion of SomeMethod. Here is the crude example:

Start Task 00:00:00 (10 second delay) SomeMethod executed at 00:00:10 (takes 15 minutes) (10 minute delay) SomeMethod executed at 00:25:10 ... and so on. 

I know how to do this using TPL. I can start the task using Task.Delay and execute SomeMethod and then after every completion (ContinueWith TaskStatus.RanToCompletion), I create a new task and execute SomeMethod again.

My question is that is this possible using Observable.Timer? Something like...

Observable.Timer(TimeSpan.FromSeconds(10), TimeSpan.FromMinutes(10)) 

The problem with this code is that if SomeMethod takes 15 minutes, I will have two different SomeMethod instances running, which I don't want. I want the 10 minute timer to start after SomeMethod completes. Is this possible using Observable or should I stay with TPL?

EDIT: Forgot to mention that I want SomeMethod to run in it's own thread.

0

2 Answers 2

7

You should have done some more investigation into using Observable.Timer. It works almost like you want straight out of the box.

One important thing to understand about Rx is that it guarantees that you will never get concurrent executions of a single subscription. While Rx might enable all sorts of multi-threading scenarios it will always serialize subscriptions.

So, take this observable subscription as an example:

Observable .Timer(TimeSpan.FromSeconds(10.0), TimeSpan.FromSeconds(2.0)) .Timestamp() .Subscribe(x => { Thread.Sleep(5000); Console.WriteLine(x.ToString()); }); 

I've created an observable that will wait 10 seconds to begin emitting values and will then try to emit a value every 2 seconds.

I've then added .Timestamp() to record exactly when the values are produced.

Finally, I've subscribed with an observer that forces a 5 second thread sleep.

Here's the first 4 values output:

0@2015-08-31 10:44:34 +00:00 1@2015-08-31 10:44:39 +00:00 2@2015-08-31 10:44:44 +00:00 3@2015-08-31 10:44:49 +00:00 

You'll notice that the gap between the values is 5 seconds. This is pretty close to what you want. Rx sees that the two seconds has elapsed and executes the next value immediately.

But there is another Rx operator that does exactly what you want - .Generate(...). This is a very powerful operator for generating all sorts of observable streams.

You want to use it like this:

Observable .Generate(0, x => true, x => x + 1, x => x, x => x == 0 ? TimeSpan.FromSeconds(10.0) : TimeSpan.FromSeconds(2.0)) .Timestamp() .Subscribe(x => { Thread.Sleep(5000); Console.WriteLine(x.ToString()); }); 

In this case it works exactly the way you want. Here's the first ten values:

0@2015-08-31 10:48:27 +00:00 1@2015-08-31 10:48:34 +00:00 2@2015-08-31 10:48:41 +00:00 3@2015-08-31 10:48:48 +00:00 4@2015-08-31 10:48:55 +00:00 5@2015-08-31 10:49:02 +00:00 6@2015-08-31 10:49:09 +00:00 7@2015-08-31 10:49:16 +00:00 8@2015-08-31 10:49:23 +00:00 9@2015-08-31 10:49:30 +00:00 

It's firing every 7 seconds. The 2 from the generate operator and the 5 from the observer.

You can obviously put in the times you need.

Sign up to request clarification or add additional context in comments.

3 Comments

Thanks for the answer. I was doing the exact same thing but was not getting the result as I missed an important detail in my question. I want to run SomeMethod in its own seperate thread with the expected behaviour. I guess it involves SubscribeOn (or ObserveOn?), but I get really confused with it. I have a very basic knowledge of Rx.
@Yogesh - you probably should use a new instance of EventLoopScheduler in ObserveOn or add directly in .Generate(...).
Your Generate solution works perfectly with SubscribeOn (Timer doesn't). I also read into the LongRunningTask issue with using TaskPoolScheduler.Default, but it seems to affect ObserveOn more because it tries to create a thread for every observe. Although I am still profiling, I think the solution should work without any OOM exceptions. If it does seem to create memory related issues, I still have TaskPoolScheduler.Default.DisableOptimizations(typeof(ISchedulerLongRunning)). :)
2

Assuming SomeMethod emits an OnCompleted event when it completes, we can write it as an Observable

//If SomeMethod OnCompleted conforms to .NET Event Pattern var completedObservable = Observable.FromEventPattern<OnCompletedEventArgs>( e => this.OnCompleted += e, e => this.OnCompleted += e); //Subscribe to OnCompleted events var repeatDisposable = completedObservable.Subscribe(_ => Observable.Timer(TimeSpan.FromMinutes(10)) .Subscribe(_ => SomeMethod())); //Start condition var starterDisposable = Observable.Timer(TimeSpan.FromSeconds(10)) .Subscribe(_ => SomeMethod()); 

4 Comments

I meant ContinueWith TaskStatus.RanToCompletion instead of ContinueWith OnCompleted. Fixed in question. The method does not implement any completion event. It is a nice solution though. Upvoted.
I'm not sure it's a good idea to start polluting methods with Completed events. I would also be wary of using Subscribe within a Subscribe, partly due to disposable-related issues, and partly because your Subscribe should be there to process the output of a steam rather than kicking off new ones.
Thanks for the comment @Chris , do you have a suggestion on how to improve the subscription ?
Generally speaking, if you want to string together observables, then using a SelectMany to create 1 pipeline is the best approach. You can then have a single subscribe block which consumes this pipeline. This isn't going to look pretty in a comment but something like this: (from result1 in Obs1() from result2 in Obs2() select result2).Subscribe(x => {...}, ex => {...})

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.