18

I had to write a console application that called Microsoft Dynamics CRM web service to perform an action on over eight thousand CRM objects. The details of the web service call are irrelevant and not shown here but I needed a multi-threaded client so that I could make calls in parallel. I wanted to be able to control the number of threads used from a config setting and also for the application to cancel the whole operation if the number of service errors reached a config-defined threshold.

I wrote it using Task Parallel Library Task.Run and ContinueWith, keeping track of how many calls (threads) were in progress, how many errors we'd received, and whether the user had cancelled from the keyboard. Everything worked fine and I had extensive logging to assure myself that threads were finishing cleanly and that everything was tidy at the end of the run. I could see that the program was using the maximum number of threads in parallel and, if our maximum limit was reached, waiting until a running task completed before starting another one.

During my code review, my colleague suggested that it would be better to do it with async/await instead of tasks and continuations, so I created a branch and rewrote it that way. The results were interesting - the async/await version was almost twice as slow, and it never reached the maximum number of allowed parallel operations/threads. The TPL one always got to 10 threads in parallel whereas the async/await version never got beyond 5.

My question is: have I made a mistake in the way I have written the async/await code (or the TPL code even)? If I have not coded it wrong, can you explain why the async/await is less efficient, and does that mean it is better to carry on using TPL for multi-threaded code.

Note that the code I tested with did not actually call CRM - the CrmClient class simply thread-sleeps for a duration specified in the config (five seconds) and then throws an exception. This meant that there were no external variables that could affect the performance.

For the purposes of this question I created a stripped down program that combines both versions; which one is called is determined by a config setting. Each of them starts with a bootstrap runner that sets up the environment, creates the queue class, then uses a TaskCompletionSource to wait for completion. A CancellationTokenSource is used to signal a cancellation from the user. The list of ids to process is read from an embedded file and pushed onto a ConcurrentQueue. They both start off calling StartCrmRequest as many times as max-threads; subsequently, every time a result is processed, the ProcessResult method calls StartCrmRequest again, keeping going until all of our ids are processed.

You can clone/download the complete program from here: https://bitbucket.org/kentrob/pmgfixso/

Here is the relevant configuration:

<appSettings> <add key="TellUserAfterNCalls" value="5"/> <add key="CrmErrorsBeforeQuitting" value="20"/> <add key="MaxThreads" value="10"/> <add key="CallIntervalMsecs" value="5000"/> <add key="UseAsyncAwait" value="True" /> </appSettings> 

Starting with the TPL version, here is the bootstrap runner that kicks off the queue manager:

public static class TplRunner { private static readonly CancellationTokenSource CancellationTokenSource = new CancellationTokenSource(); public static void StartQueue(RuntimeParameters parameters, IEnumerable<string> idList) { Console.CancelKeyPress += (s, args) => { CancelCrmClient(); args.Cancel = true; }; var start = DateTime.Now; Program.TellUser("Start: " + start); var taskCompletionSource = new TplQueue(parameters) .Start(CancellationTokenSource.Token, idList); while (!taskCompletionSource.Task.IsCompleted) { if (Console.KeyAvailable) { if (Console.ReadKey().Key != ConsoleKey.Q) continue; Console.WriteLine("When all threads are complete, press any key to continue."); CancelCrmClient(); } } var end = DateTime.Now; Program.TellUser("End: {0}. Elapsed = {1} secs.", end, (end - start).TotalSeconds); } private static void CancelCrmClient() { CancellationTokenSource.Cancel(); Console.WriteLine("Cancelling Crm client. Web service calls in operation will have to run to completion."); } } 

Here is the TPL queue manager itself:

public class TplQueue { private readonly RuntimeParameters parameters; private readonly object locker = new object(); private ConcurrentQueue<string> idQueue = new ConcurrentQueue<string>(); private readonly CrmClient crmClient; private readonly TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>(); private int threadCount; private int crmErrorCount; private int processedCount; private CancellationToken cancelToken; public TplQueue(RuntimeParameters parameters) { this.parameters = parameters; crmClient = new CrmClient(); } public TaskCompletionSource<bool> Start(CancellationToken cancellationToken, IEnumerable<string> ids) { cancelToken = cancellationToken; foreach (var id in ids) { idQueue.Enqueue(id); } threadCount = 0; // Prime our thread pump with max threads. for (var i = 0; i < parameters.MaxThreads; i++) { Task.Run((Action) StartCrmRequest, cancellationToken); } return taskCompletionSource; } private void StartCrmRequest() { if (taskCompletionSource.Task.IsCompleted) { return; } if (cancelToken.IsCancellationRequested) { Program.TellUser("Crm client cancelling..."); ClearQueue(); return; } var count = GetThreadCount(); if (count >= parameters.MaxThreads) { return; } string id; if (!idQueue.TryDequeue(out id)) return; IncrementThreadCount(); crmClient.CompleteActivityAsync(new Guid(id), parameters.CallIntervalMsecs).ContinueWith(ProcessResult); processedCount += 1; if (parameters.TellUserAfterNCalls > 0 && processedCount%parameters.TellUserAfterNCalls == 0) { ShowProgress(processedCount); } } private void ProcessResult(Task<CrmResultMessage> response) { if (response.Result.CrmResult == CrmResult.Failed && ++crmErrorCount == parameters.CrmErrorsBeforeQuitting) { Program.TellUser( "Quitting because CRM error count is equal to {0}. Already queued web service calls will have to run to completion.", crmErrorCount); ClearQueue(); } var count = DecrementThreadCount(); if (idQueue.Count == 0 && count == 0) { taskCompletionSource.SetResult(true); } else { StartCrmRequest(); } } private int GetThreadCount() { lock (locker) { return threadCount; } } private void IncrementThreadCount() { lock (locker) { threadCount = threadCount + 1; } } private int DecrementThreadCount() { lock (locker) { threadCount = threadCount - 1; return threadCount; } } private void ClearQueue() { idQueue = new ConcurrentQueue<string>(); } private static void ShowProgress(int processedCount) { Program.TellUser("{0} activities processed.", processedCount); } } 

Note that I am aware that a couple of the counters are not thread safe but they are not critical; the threadCount variable is the only critical one.

Here is the dummy CRM client method:

public Task<CrmResultMessage> CompleteActivityAsync(Guid activityId, int callIntervalMsecs) { // Here we would normally call a CRM web service. return Task.Run(() => { try { if (callIntervalMsecs > 0) { Thread.Sleep(callIntervalMsecs); } throw new ApplicationException("Crm web service not available at the moment."); } catch { return new CrmResultMessage(activityId, CrmResult.Failed); } }); } 

And here are the same async/await classes (with common methods removed for the sake of brevity):

public static class AsyncRunner { private static readonly CancellationTokenSource CancellationTokenSource = new CancellationTokenSource(); public static void StartQueue(RuntimeParameters parameters, IEnumerable<string> idList) { var start = DateTime.Now; Program.TellUser("Start: " + start); var taskCompletionSource = new AsyncQueue(parameters) .StartAsync(CancellationTokenSource.Token, idList).Result; while (!taskCompletionSource.Task.IsCompleted) { ... } var end = DateTime.Now; Program.TellUser("End: {0}. Elapsed = {1} secs.", end, (end - start).TotalSeconds); } } 

The async/await queue manager:

public class AsyncQueue { private readonly RuntimeParameters parameters; private readonly object locker = new object(); private readonly CrmClient crmClient; private readonly TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>(); private CancellationToken cancelToken; private ConcurrentQueue<string> idQueue = new ConcurrentQueue<string>(); private int threadCount; private int crmErrorCount; private int processedCount; public AsyncQueue(RuntimeParameters parameters) { this.parameters = parameters; crmClient = new CrmClient(); } public async Task<TaskCompletionSource<bool>> StartAsync(CancellationToken cancellationToken, IEnumerable<string> ids) { cancelToken = cancellationToken; foreach (var id in ids) { idQueue.Enqueue(id); } threadCount = 0; // Prime our thread pump with max threads. for (var i = 0; i < parameters.MaxThreads; i++) { await StartCrmRequest(); } return taskCompletionSource; } private async Task StartCrmRequest() { if (taskCompletionSource.Task.IsCompleted) { return; } if (cancelToken.IsCancellationRequested) { ... return; } var count = GetThreadCount(); if (count >= parameters.MaxThreads) { return; } string id; if (!idQueue.TryDequeue(out id)) return; IncrementThreadCount(); var crmMessage = await crmClient.CompleteActivityAsync(new Guid(id), parameters.CallIntervalMsecs); ProcessResult(crmMessage); processedCount += 1; if (parameters.TellUserAfterNCalls > 0 && processedCount%parameters.TellUserAfterNCalls == 0) { ShowProgress(processedCount); } } private async void ProcessResult(CrmResultMessage response) { if (response.CrmResult == CrmResult.Failed && ++crmErrorCount == parameters.CrmErrorsBeforeQuitting) { Program.TellUser( "Quitting because CRM error count is equal to {0}. Already queued web service calls will have to run to completion.", crmErrorCount); ClearQueue(); } var count = DecrementThreadCount(); if (idQueue.Count == 0 && count == 0) { taskCompletionSource.SetResult(true); } else { await StartCrmRequest(); } } } 

So, setting MaxThreads to 10 and CrmErrorsBeforeQuitting to 20, the TPL version on my machine completes in 19 seconds and the async/await version takes 35 seconds. Given that I have over 8000 calls to make this is a significant difference. Any ideas?

16

3 Answers 3

10

I think I'm seeing the problem here, or at least a part of it. Look closely at the two bits of code below; they are not equivalent.

// Prime our thread pump with max threads. for (var i = 0; i < parameters.MaxThreads; i++) { Task.Run((Action) StartCrmRequest, cancellationToken); } 

And:

// Prime our thread pump with max threads. for (var i = 0; i < parameters.MaxThreads; i++) { await StartCrmRequest(); } 

In the original code (I am taking it as a given that it is functionally sound) there is a single call to ContinueWith. That is exactly how many await statements I would expect to see in a trivial rewrite if it is meant to preserve the original behaviour.

Not a hard and fast rule and only applicable in simple cases, but nevertheless a good thing to keep an eye out for.

Sign up to request clarification or add additional context in comments.

8 Comments

Just starting to understand that. It was my (probably faulty) understanding that the main body of code continues running after the await so that the loop will continue. Just going to confirm that now.
@RobKent, this actually looks like a fairly simple rewrite - you've just overdone in some parts changing things that didn't have to be changed (again, I'm assuming that the original code was correct to begin with). I've made an edit re the number of await statements.
Ỳes, just tested it and the loop with the await in it is synchronous.
@RobKent, it's still asynchronous, but sequential instead of parallel.
By the way, you taking out that Task.Run bit also changes things, because in the rewrite StartCrmRequest will, indeed, run synchronously until it hits the first await, whereas Task.Run offloads the part of the method preceding your await to another thread from the start (making the call asynchronous). That is a subtle difference, but worth mentioning. This won't have a negative performance impact if the method runs quickly before hitting await (which I'm sure yours does), but if you happen to do substantial synchronous work before the await, it would introduce further delay.
|
4

I think you over complicated your solution and ended up not getting where you wanted in either implementation.

First of all, connections to any HTTP host are limited by the service point manager. The default limit for client environments is 2, but you can increase it yourself.

No matter how much threads you spawn, there won't be more active requests than those allwed.

Then, as someone pointed out, await logically blocks the execution flow.

And finally, you spent your time creating an AsyncQueue when you should have used TPL data flows.

2 Comments

Yes, thanks for pointing out the ServicePointManager limit - I was not aware of that. While that is definitely relevant when I implement the actual web call, for the purposes of this question, it is irrelevant because we only need to know why the async/await performs more slowly without actually making a web call. Thanks for your comment though.
If you really want to prove that, you'll need a far less complicated exemple.
0

When implemented with async/await, I would expect the I/O bound algorithm to run on a single thread. Unlike @KirillShlenskiy, I believe that the bit responsible for "bringing back" to caller's context is not responsible for the slow-down. I think you overrun the thread pool by trying to use it for I/O-bound operations. It's designed primarily for compute-bound ops.

Have a look at ForEachAsync. I feel that's what you're looking for (Stephen Toub's discussion, you'll find Wischik's videos meaningful too):

http://blogs.msdn.com/b/pfxteam/archive/2012/03/05/10278165.aspx

(Use degree of concurrency to reduce memory footprint)

http://vimeo.com/43808831 http://vimeo.com/43808833

3 Comments

Thanks, I will look at those links.
There's one comment in particular on that article I agree with: "This new await and async keywords really wreck my brain. At one point I think I understand them and at another I realize that I don't." I need to study it more.
@RobKent I am glad to share links to this wonderfully rich content. Happy reading and viewing!

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.