2

Let's say I have a UI thread and a background thread that subscribe to a custom thread-safe ObservableCollection that I created so that whenever the collection changes it executes the callback within the appropriate context.

Now let's say I add something to the collection (from either thread, doesn't matter which one) and it now has to marshall the callback to both threads. To execute the callback within the UI's context I can simply do a Dispatcher.Invoke(...) and it executes the callback within the UI's context; great.

Now I want to execute the callback within the background thread's context (don't ask me why, it may well be that whatever it's accessing is affinitized to that specific thread or has thread-local storage it needs to access); how would I do that?

Background threads don't have a dispatcher/message pumping mechanism so I can't use a dispatcher or SynchronizationContext, so how would one interrupt a background thread and have it execute my callback within its context?

EDIT: I keep getting answers that are obviously wrong so I must not have explained myself correctly. Forget the UI thread and UI dispatchers guys, they were meant to marshall calls to the UI thread, that's it! Imagine two worker threads A and B. If A modifies my collection then A is in charge of marshalling the callback to itself and to B. Executing the callback within A's context is easy since A was the one triggering it : simply call the delegate in place. Now A needs to marshall the callback to B... now what? Dispatcher and SynContext are useless in this situation.

5
  • I would look into using reflection to invoke the callback. Commented Apr 11, 2014 at 7:50
  • Is it also a valid scenario when you want to execute a callback into the worker thread from inside the Dispatcher.Invoke callback made from the worker thread? Commented Apr 11, 2014 at 8:54
  • @TravisJ : I don't understand how reflection will help. Commented Apr 11, 2014 at 14:17
  • @Noseratio : Forget dispatcher.invoke, forget the ui thread. Imagine I have 2 worker threads and I want to dispatch my event to both worker threads; what can I use? Commented Apr 11, 2014 at 14:18
  • I'd implement it this way. Commented Apr 12, 2014 at 3:52

3 Answers 3

2

A good idea might also be extending your own TaskScheduler, you will have to implement three methods:

QueueTask, TryExecuteTaskInline and GetScheduledTasks

you can read about it here

That way, anytime you need to run something on your dedicated thread you could just do:

Task.Factory.StartNew(() => { SomeAction }, SomeCancellationToken, TaskCreationOptions new MyTaskSchedular()); 

and have it execute on your thread.

Sign up to request clarification or add additional context in comments.

4 Comments

Apart from the fact that the TPL is a total bag of spanners. A SynchronizationContext provides the abstraction without the need to get involved in that stuff.
I think its a matter of taste. He may choose which ever is easier for him to implement and maintain i guess
@TheMouthofaCow : A synchronization context will not help; it will only marshall calls to a UI thread but not to a background thread. If the target thread is a background thread it will simply execute it in the current thread (whatever that may be).
@Yuval : How do I target specific threads using a task scheduler and how can it be done synchronously? And this is simply creating work on new threads instead of delegating the work to existing threads.
0

We have a component that must always run on the same STA background thread. We've achieved this by writing our own SynchronizationContext. This article is very helpful.

To summarise, you don't want to interrupt your worker thread, you want it to sit idle waiting for the next task that it should execute. You add jobs to a queue and it processes those jobs in order. The SynchronizationContext is a convenient abstraction around that idea. The SynchronizationContext is the owner of the worker thread - and the outside world does not interact with the thread directly: callers who want to execute a task on the worker thread make the request to the context which adds the job to the job queue. The worker is either working or polling the queue until another job is added, at which point it begins working again.

Update

Here is an example:

using System.Collections.Concurrent; using System.Threading; class LoadBalancedContext : SynchronizationContext { readonly Thread thread1; readonly Thread thread2; readonly ConcurrentQueue<JobInfo> jobs = new ConcurrentQueue<JobInfo>(); public LoadBalancedContext() { this.thread1 = new Thread(this.Poll) { Name = "T1" }; this.thread2 = new Thread(this.Poll) { Name = "T2" }; this.thread1.Start(); this.thread2.Start(); } public override void Post(SendOrPostCallback d, object state) { this.jobs.Enqueue(new JobInfo { Callback = d, State = state }); } void Poll() { while (true) { JobInfo info; if (this.jobs.TryDequeue(out info)) { info.Callback(info.State); } Thread.Sleep(100); } } class JobInfo { public SendOrPostCallback Callback { get; set; } public object State { get; set; } } } 

Usage:

var context = new LoadBalancedContext(); SendOrPostCallback callback = x => { Trace.WriteLine(Thread.CurrentThread.Name); Thread.Sleep(200); }; context.Post(callback, null); context.Post(callback, null); context.Post(callback, null); context.Post(callback, null); context.Post(callback, null); context.Post(callback, null); context.Post(callback, null); context.Post(callback, null); context.Post(callback, null); context.Post(callback, null); context.Post(callback, null); Thread.Sleep(1000); 

The Send case is slightly more involved as you will need to listen for a reset event.. This is not production quality, but should give you an idea ow what you need to do.

Hope that helps.

8 Comments

Bu the SyncrhoinzationContext will not do that to a worker thread; first off each thread doesn't start off with a SyncContext; it needs to be created. UI threads will put a SyncContext there automatically but worker threads don't have one so it has to be created but a SyncContext on a worker thread does no marshalling to another worker thread; it executes the callback in whatever thread calls it. Forget about UI threads and let's imagine I have 2 worker threads and I have a callback that needs to execute on both of them; how can I get it to execute within each worker's context?
You are thinking about the problem in the wrong way. You MUST let the context own the (two) threads. The main thread is special and has lots of funky magic around it. It already has the ability to live forever and queue task on it. There is no mechanism that allows you to do that with an arbitrary thread so you need to create a queue of jobs and then poll that queue. I promise that article does exactly what you want. They guy overcomplicates things, but honestly if you read that it will tell you what you want to know.
You're essentially recreating the task pool and I appreciate the alternative but it doesn't answer my question which I now believe is 'no, not possible'. Actually you already answered it in the comment saying there's no mechanism to do this and you're right; I don't expect there to be some message pump on a background thread. I was just wondering if there was any facility in place anywhere in Win32 or CLR land that would allow marshalling to a worker thread without creating your own pump/custom blocking queue thread.
I think you misunderstand what a thread actually is. Conceptually all a thread is is a sequence of instructions. Once the sequence of instructions is exhausted the thread is complete. To achieve your goal what you want to do is prevent that sequence of instructions from becoming exhausted and to be able to insert more instructions into that sequence in such a way that they are executed as soon as possible. There is a mechanism to do this which is to maintain a queue of work and poll it in an infinite loop.
@Anthony Maybe not deeply enough!
|
0

Forget dispatcher.invoke, forget the ui thread. Imagine I have 2 worker threads and I want to dispatch my event to both worker threads; what can I use?

I'd use two task schedulers for this (as @YuvalItzchakov's answer suggests), one for each thread. I'd also use a custom synchronization context for the worker thread, as @TheMouthofaCow's answer suggests.

That is, for a UI thread, I'd just save and use TaskScheduler.FromCurrentSynchronizationContext(). For the worker thread, I would start a thread and install a custom synchronization context on it, then use FromCurrentSynchronizationContext too.

Something like this (untested):

// UI thread var uiTaskScheduler = TaskScheduler.FromCurrentSynchronizationContext(); using (var worker = new ThreadWithPumpingSyncContext()) { // call the worker thread var result = await worker.Run(async () => { // worker thread await Task.Delay(1000); // call the UI thread await Task.Factory.StartNew(async () => { // UI thread await Task.Delay(2000); MessageBox.Show("UI Thread!"), // call the worker thread await worker.Run(() => { // worker thread Thread.Sleep(3000) }); // UI thread await Task.Delay(4000); }, uiTaskScheduler).Unwrap(); // worker thread await Task.Delay(5000); return Type.Missing; // or implement a non-generic version of Run }); } // ... // ThreadWithSerialSyncContext renamed to ThreadWithPumpingSyncContext class ThreadWithPumpingSyncContext : SynchronizationContext, IDisposable { public readonly TaskScheduler Scheduler; // can be used to run tasks on the pumping thread readonly Task _mainThreadTask; // wrap the pumping thread as Task readonly BlockingCollection<Action> _actions = new BlockingCollection<Action>(); // track async void methods readonly object _lock = new Object(); volatile int _pendingOps = 0; // the number of pending async void method calls volatile TaskCompletionSource<Empty> _pendingOpsTcs = null; // to wait for pending async void method calls public ThreadWithPumpingSyncContext() { var tcs = new TaskCompletionSource<TaskScheduler>(); _mainThreadTask = Task.Factory.StartNew(() => { try { SynchronizationContext.SetSynchronizationContext(this); tcs.SetResult(TaskScheduler.FromCurrentSynchronizationContext()); // pumping loop foreach (var action in _actions.GetConsumingEnumerable()) action(); } finally { SynchronizationContext.SetSynchronizationContext(null); } }, TaskCreationOptions.LongRunning); Scheduler = tcs.Task.Result; } // SynchronizationContext methods public override SynchronizationContext CreateCopy() { return this; } public override void OperationStarted() { lock (_lock) { if (_pendingOpsTcs != null && _pendingOpsTcs.Task.IsCompleted) throw new InvalidOperationException("OperationStarted"); // shutdown requested _pendingOps++; } } public override void OperationCompleted() { lock (_lock) { _pendingOps--; if (0 == _pendingOps && null != _pendingOpsTcs) _pendingOpsTcs.SetResult(Empty.Value); } } public override void Post(SendOrPostCallback d, object state) { _actions.Add(() => d(state)); } public override void Send(SendOrPostCallback d, object state) { throw new NotImplementedException("Send"); } // Task start helpers public Task Run(Action action, CancellationToken token = default(CancellationToken)) { return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this.Scheduler); } public Task Run(Func<Task> action, CancellationToken token = default(CancellationToken)) { return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this.Scheduler).Unwrap(); } public Task<T> Run<T>(Func<Task<T>> action, CancellationToken token = default(CancellationToken)) { return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this.Scheduler).Unwrap(); } // IDispose public void Dispose() { var disposingAlready = false; lock (_lock) { disposingAlready = null != _pendingOpsTcs; if (!disposingAlready) { // do not allow new async void method calls _pendingOpsTcs = new TaskCompletionSource<Empty>(); if (0 == _pendingOps) _pendingOpsTcs.TrySetResult(Empty.Value); } } // outside the lock if (!disposingAlready) { // wait for pending async void method calls _pendingOpsTcs.Task.Wait(); // request the end of the pumping loop _actions.CompleteAdding(); } _mainThreadTask.Wait(); } struct Empty { public static readonly Empty Value = default(Empty); } } 

This give you some sort of cooperative asynchronous execution between two threads.

5 Comments

Okay so let me get this straight... if both worker threads A and B are busy bees, always working, and thread A happens to modify the collection while it's working... Thread A can execute the callback within its own context easily; you're saying with this method that thread B, which is busy doing calculations, will be interrupted and will execute the callback within its context and return back to its frame of execution?
@Anthony, there's no way you can interrupt a busy thread, besides perhaps with Thread.About. The thread has to be ready and waiting to accept an incoming call, it's about cooperative synchronization. That's what await does above. In case with the UI thread, it "yields" to the core message loop. In case with the worker thread, it "yields" to the foreach loop for GetConsumingEnumerable. You can think of this as of coroutines executed by separate threads.
This is my fault for not being very specific in my question which I believe is simply 'no, can't be done'. I was looking for perhaps an undocumented feature that might allow this kind of craziness... for example, the CLR during garbage collection stack-walk will actually inject code in certain places so I thought even though my question may be far fetched it might be possible via some contrived mechanism such as the code-injection I referred to.
@Anthony, that'd be crazy indeed and woudn't make any sense. Where do you think a loop like for (var i = 0; i < 100000; i++) sum += i should be interrupted to run your code, and most importantly, what for? You need an explicit synchronization point for this. In earlier .NET versions you'd use synchronization primitives like ManualResetEvent. Nowadays, TPL and async/await make them pretty much redundant for high-level parallel programming.
True, it's crazy and I'm not asking for any practical purpose, it's purely hypothetical.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.