After carefully considering the other answers, I decided for my uses it was easier to create a custom QueuedTaskScheduler given I don't need to worry about async tasks or IO completion (although that has given me something to think about).
Firstly when we grab work from the child work pools, we add a semaphore based lock, inside FindNextTask_NeedsLock:
var items = queueForTargetTask._workItems; if (items.Count > 0 && queueForTargetTask.TryLock() /* This is added */) { targetTask = items.Dequeue();
For the dedicated thread version, inside ThreadBasedDispatchLoop:
// ... and if we found one, run it if (targetTask != null) { queueForTargetTask.ExecuteTask(targetTask); queueForTargetTask.Release(); }
For the task scheduler version, inside ProcessPrioritizedAndBatchedTasks:
// Now if we finally have a task, run it. If the task // was associated with one of the round-robin schedulers, we need to use it // as a thunk to execute its task. if (targetTask != null) { if (queueForTargetTask != null) { queueForTargetTask.ExecuteTask(targetTask); queueForTargetTask.Release(); } else { TryExecuteTask(targetTask); } }
Where we create the new child queues:
/// <summary>Creates and activates a new scheduling queue for this scheduler.</summary> /// <returns>The newly created and activated queue at priority 0 and max concurrency of 1.</returns> public TaskScheduler ActivateNewQueue() { return ActivateNewQueue(0, 1); } /// <summary>Creates and activates a new scheduling queue for this scheduler.</summary> /// <param name="priority">The priority level for the new queue.</param> /// <returns>The newly created and activated queue at the specified priority.</returns> public TaskScheduler ActivateNewQueue(int priority, int maxConcurrency) { // Create the queue var createdQueue = new QueuedTaskSchedulerQueue(priority, maxConcurrency, this); ... }
Finally, inside the nested QueuedTaskSchedulerQueue:
// This is added. private readonly int _maxConcurrency; private readonly Semaphore _semaphore; internal bool TryLock() { return _semaphore.WaitOne(0); } internal void Release() { _semaphore.Release(); _pool.NotifyNewWorkItem(); } /// <summary>Initializes the queue.</summary> /// <param name="priority">The priority associated with this queue.</param> /// <param name="maxConcurrency">Max concurrency for this scheduler.</param> /// <param name="pool">The scheduler with which this queue is associated.</param> internal QueuedTaskSchedulerQueue(int priority, int maxConcurrency, QueuedTaskScheduler pool) { _priority = priority; _pool = pool; _workItems = new Queue<Task>(); // This is added. _maxConcurrency = maxConcurrency; _semaphore = new Semaphore(_maxConcurrency, _maxConcurrency); }
I hope this might be useful for someone trying to do the same as me and interleave unordered tasks with ordered tasks on a single, easy to use scheduler (that can use the default threadpool, or any other scheduler).
=== UPDATE ===
Inspired by Stephen Cleary, I ended up using:
private static readonly Lazy<TaskScheduler> Scheduler = new Lazy<TaskScheduler>( () => new WorkStealingTaskScheduler(16)); public static TaskScheduler Default { get { return Scheduler.Value; } } public static TaskScheduler CreateNewOrderedTaskScheduler() { return new QueuedTaskScheduler(Default, 1); }
await sem.WaitAsync()to manually control the DOP very flexibly.; I don't think schedulers are the right abstraction. Consider using ordinary task combinators and coordination primitives.