0

How do I block the current thread until the OnComplete handler of my observer has finished, without the use of threading primitives?

Here is my code. I want that the Console.WriteLine("Press... statement should be executed only after the OnComplete handler, namely ResetCount has finished executing.

class Program { private static long totalItemCount = 0; private static long listCount = 0; static void Main() { Console.WriteLine($"Starting Main on Thread {Thread.CurrentThread.ManagedThreadId}\n"); var o = Observable.Timer(TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(1)) .Take(20) .Concat(Observable.Interval(TimeSpan.FromSeconds(0.01)).Take(200)) .Buffer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5)); o.Subscribe(Print, onCompleted: ResetCount); // How I make sure this line appears only after the OnComplete has fired? // Do I have to use traditional threading primitives such as wait handles? // Or just cause the main thread to sleep long enough? That doesn't seem right. Console.WriteLine("\nPress any key to exit..."); Console.ReadKey(); } private static void ResetCount() { if (listCount > 0) { Console.WriteLine($"{totalItemCount} items processed in {listCount} lists."); } else { Console.WriteLine($"{totalItemCount} items processed."); } Interlocked.Exchange(ref totalItemCount, 0); Interlocked.Exchange(ref listCount, 0); } static void Print<T>(T value) { var threadType = Thread.CurrentThread.IsBackground ? "Background" : "Foreground"; if (value is IList) { var list = value as IList; Console.WriteLine($"{list.Count} items in list #{Interlocked.Increment(ref listCount)}:"); foreach (var item in list) { Console.WriteLine($"{item.ToString()}, ({threadType} #{Thread.CurrentThread.ManagedThreadId}), Item #{Interlocked.Increment(ref totalItemCount)}"); } Console.WriteLine(); } else { Console.WriteLine($"{value.ToString()}, ({threadType} #{Thread.CurrentThread.ManagedThreadId}), Item #{Interlocked.Increment(ref totalItemCount)}"); } } } 
2
  • Why don't you move this statement to the end of ResetCount() method? Commented Jun 14, 2016 at 11:08
  • Yes, I could remove this statement in this particular case to the end of the ResetCount method, but that would be a cop-out and more than this particular example, I would like to know how to solve the class of problems I am trying to highlight with this example. Commented Jun 14, 2016 at 23:27

1 Answer 1

1

On Rx we have specific schedulers to handle threading, synchronization and related.

You can read more about that here: http://www.introtorx.com/content/v1.0.10621.0/15_SchedulingAndThreading.html

But basically what you're looking for is changing this line:

 .Buffer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5), Scheduler.CurrentThread); 

They're several ways to test/validate a Rx query. Keep in mind that wouldn't be the answer for all the problems.

Sign up to request clarification or add additional context in comments.

4 Comments

Yes, thank you. I was aware of schedulers but I must have been awake too long to realize that I used the default scheduler, I am now confused. The default scheduler in the case of a console app would be the CurrentThreadScheduler, right? Or would it be the ThreadPoolScheduler? From the results this program produced without specifying a scheduler, it looked like it was the ThreadPoolScheduler but if I changed the program to produce something less complicated such as Observable.Range(...), it used the CurrentThreadScheduler as the default. Why is that?
I think I understand. If we do not specify a scheduler, it selects the best scheduler for the platform and the operation as the default scheduler. So the call to IObservable.Timer would have used a thread pool scheduler to run the System.Threading.Timer on. Interval might have done the same.
AFAIK each operator has a standard/default scheduler in case that the IScheduler is not being provided. So it really dependents of the operator that you're using -stackoverflow.com/questions/15341864/… Another code that you would be useful for testing purposes is .Wait(). More info: stackoverflow.com/questions/37801699/…
Interesting how one could use Wait to wait for completion of an observable. Nice. Thank you.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.