I have a large number of pieces of data to process. The current code can be simplified as following:
public void ProcessData(string data) { string resultOfA = doCpuBoundWorkA(data); string resultOfS1 = sendToServiceS1(resultOfA); string resultOfB = doCpuBoundWorkB(resultOfS1); string resultOfS2 = sendToServiceS2(resultOfB); string resultOfC = doCpuBoundWorkC(resultOfS2); } The ProcessData is invoked using Parallel.ForEach. This implementation is not optimal from at least two perspectives. First of all the calls to the services are blocking so we are blocking threads while waiting for the call to return. Secondly Parallel.ForEach creates Tasks that are scheduled for execution on the thread pool. The Thread Pool creates additional threads every 500ms (if I'm not wrong on that) and because 'ProcessData' takes longer than 500ms to complete, over time, we end up with hundreds of threads that spend most of the time waiting for the services to comeback.
My naive idea for 'improvement' was this:
public async Task ProcessData(string data) { string resultOfA = doCpuBoundWorkA(data); string resultOfS1 = await sendToServiceS1Async(resultOfA); string resultOfB = doCpuBoundWorkB(resultOfS1); string resultOfS2 = await sendToServiceS2Async(resultOfB); string resultOfC = doCpuBoundWorkC(resultOfS2); } I'm new to async/await so I could be totally wrong in my understanding of what it is actually happening.
With the async/await keywords the compiler breaks the code of the ProcessData it into multiple Tasks.
- Task-A: From the beginning of the ProcessData method up-to the point where call to The ServiceA "hits the wire".
- Task-B: From the moment when we pick up the results of call to the ServiceA up-to the point where call to The ServiceB "hits the wire".
- Task-C: From the moment when we pick up the results of call to the ServiceB up-to the end of the ProcessData method.
As a result instead of having a single "processing unit of work" we have three, where each piece is scheduled for execution based on its positions in the scheduler's queue.
The problem is that by the time Task-B (for the first piece of work) is put on the scheduler's queue I may have hundreds of Task-A, put there by Parallel.ForEach, and by the time Task-C (for the first piece of work) is placed on the scheduler's queue the situation will be even worse.
I would like the data pass through as fast as possible, so I need to be able to prioritize Task-C over Task-B over Task-A. What will be the best way to achieve this?
INotifyCompletion, SynchronizationContext come to mind, but it seems to be the "dark corners" of the async/await. ParallelExtensionsExtras has ReprioritizableTaskScheduler and QueuedTaskScheduler with priority queues, but how do I tell async/await to use the desired scheduler?
John Skeet talks about this problem in his blog: https://codeblog.jonskeet.uk/2010/11/02/configuring-waiting/