0

I'm running a method synchronously in parallel using System.Threading.Tasks.Parallel.ForEach. At the end of the method, it needs to make a few dozen HTTP POST requests, which do not depend on each other. Since I'm on .NET Framework 4.6.2, System.Net.Http.HttpClient is exclusively asynchronous, so I'm using Nito.AsyncEx.AsyncContext to avoid deadlocks, in the form:

public static void MakeMultipleRequests(IEnumerable<MyClass> enumerable) { AsyncContext.Run(async () => await Task.WhenAll(enumerable.Select(async c => await getResultsFor(c).ConfigureAwait(false)))); } 

The getResultsFor(MyClass c) method then creates an HttpRequestMessage and sends it using:

await httpClient.SendAsync(request); 

The response is then parsed and the relevant fields are set on the instance of MyClass.

My understanding is that the synchronous thread will block at AsyncContext.Run(...), while a number of tasks are performed asynchronously by the single AsyncContextThread owned by AsyncContext. When they are all complete, the synchronous thread will unblock.

This works fine for a few hundred requests, but when it scales up to a few thousand over five minutes, some of the requests start returning HTTP 408 Request Timeout errors from the server. My logs indicate that these timeouts are happening at the peak load, when there are the most requests being sent, and the timeouts happen long after many of the other requests have been received back.

I think the problem is that the tasks are awaiting the server handshake inside HttpClient, but they are not continued in FIFO order, so by the time they are continued the handshake has expired. However, I can't think of any way to deal with this, short of using a System.Threading.SemaphoreSlim to enforce that only one task can await httpClient.SendAsync(...) at a time.

My application is very large, and converting it entirely to async is not viable.

5
  • 2
    There isn't just black and white. Throttling down to 1 at a time would pretty much destroy your parallel approach. But throttling of some kind is needed. What you are doing right now is basically a DoS attack. Commented Sep 28, 2021 at 11:32
  • The error is specifically a client-side 403 timeout, not a server-side 503 error. At peak I'm making a few thousand requests over five minutes, which is well within the server capacity. I have separate error handling code for the client to to handle server-side issues. Commented Sep 28, 2021 at 11:49
  • 1
    Yes, you can DoS your own (local) network stack :) Solution is the same: tame your horses a bit. Maybe build batches or give some jitter offset time before starting ... or simply make sure there are less than X open requests at the same time. What X should ideally be probably depends on the client system, though. Commented Sep 28, 2021 at 11:53
  • 2
    You might want to read this: makolyte.com/… Commented Sep 28, 2021 at 12:39
  • 1
    @Fildor "making sure there are less than X open requests at the same time" - Unless I've misunderstood something, I don't think this can solve my problem (unless X == 1). Say there are 2 slots; what is to stop requests #2 through #200 passing through slot B while request #1 sits in slot A waiting to be continued, so when it is resumed it immediately times out? Commented Sep 28, 2021 at 17:15

2 Answers 2

2

This isn't something that can be done with wrapping the tasks before blocking. For starters, if the requests go through, you may end up nuking the server. Right now you're nuking the client. There's a 2 concurrent-request per domain limit in .NET Framework that can be relaxed, but if you set it too high you may end up nuking the server.

You can solve this by using DataFlow blocks in a pipeline to execute requests with a fixed degree of parallelism and then parse them. Let's say you have a class called MyPayload with lots of Items in a property:

ServicePointManager.DefaultConnectionLimit = 1000; var options=new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 }; var downloader=new TransformBlock<string,MyPayload>(async url=>{ var json=await _client.GetStringAsync(url); var data=JsonConvert.DeserializeObject<MyPayload>(json); return data; },options); var importer=new ActionBlock<MyPayload>(async data=> { var items=data.Items; using(var connection=new SqlConnection(connectionString)) using(var bcp=new SqlBulkCopy(connection)) using(var reader=ObjectReader.Create(items)) { bcp.DestinationTableName = destination; connection.Open(); await bcp.WriteToServerAsync(reader); } }); downloader.LinkTo(importer,new DataflowLinkOptions { PropagateCompletion=true }); 

I'm using FastMember's ObjectReader to wrap the items in a DbDataReader that can be used to bulk insert the records to a database.

Once you have this pipeline, you can start posting URLs to the head block, downloader :

foreach(var url in hugeList) { downloader.Post(url); } downloader.Complete(); 

Once all URLs are posted, you tell donwloader to complete and await for the last block in the pipeline to finish with :

await importer.Completion; 
Sign up to request clarification or add additional context in comments.

27 Comments

I've already tried enforcing a limit on request parallelism using a SemaphoreSlim, as per the question. I don't think this solution would fix the issue of disordered async continuations causing timeouts - unless the enforced degree of parallelism is 1.
@SimonW it does, I'm using it to download 100K air ticket records per day for the last 6-7 years. This isn't enforcing parallelism. It's creatiing 10 worker tasks to process all the URLs posted to the block. That's completely different from starting 100 tasks and blocking 90 of them. Besides, in .NET Framework only 2 concurrent requests are allowed at a time. By changing DefaultConnectionLimit it's possible to make up to 1000 requests. MaxDOP=10 though ensures only 10 URLs will be processed at a time
@SimonW it's also possible to further throttle requests by adding await Task.Delay() in the worker method. To avoid sending requests in waves, the delay can be randomized.
As I understand it, this will attempt to process 10 requests asynchronously; send #1, then while awaiting its return, send #2, and so on. There is no guarantee that request #1 will be continued soon after its server handshake returns, instead of the single available thread choosing to continue various awaits encountered in requests #2 through #10 first - and when request #1 is finally continued, it times out. The only way to fix this with Dataflow would be to only allow one request at a time, which is identical to throttling with a semaphore. Is my understanding of this incorrect?
@SimonW no. It will create 10 worker tasks and each one will process an input message. 10 threads will be processing messages at a time. Why would there be a timeout? Even if there is, it will affect the current task only and can be handled with a try/catch block. The other workers won't be affected. I've been using this to download air ticket sales reports with thousands of records each, parse them, forward the ticket numbers to a next step that retrieves individual ticket records.
|
0

Firstly, Nito.AsyncEx.AsyncContext will execute on a threadpool thread; to avoid deadlocks in the way described requires an instance of Nito.AsyncEx.AsyncContextThread, as outlined in the documentation.

There are two possible causes:

  • a bug in System.Net.Http.HttpClient in .NET Framework 4.6.2
  • the continuation priority issue outlined in the question, in which individual requests are not continued promptly enough and so time out.

As described at this answer and its comments, from a similar question, it may be possible to deal with the priority problem using a custom TaskScheduler, but throttling the number of concurrent requests using a semaphore is probably the best answer:

using System.Collections.Generic; using System.Linq; using System.Net.Http; using System.Threading; using System.Threading.Tasks; using Nito.AsyncEx; public class MyClass { private static readonly AsyncContextThread asyncContextThread = new AsyncContextThread(); private static readonly HttpClient httpClient = new HttpClient(); private static readonly SemaphoreSlim semaphore = new SemaphoreSlim(10); public HttpRequestMessage Request { get; set; } public HttpResponseMessage Response { get; private set; } private async Task GetResponseAsync() { await semaphore.WaitAsync(); try { Response = await httpClient.SendAsync(Request); } finally { semaphore.Release(); } } public static void MakeMultipleRequests(IEnumerable<MyClass> enumerable) { Task.WaitAll(enumerable.Select(c => asyncContextThread.Factory.Run(() => c.GetResponseAsync())).ToArray()); } } 

Edited to use AsyncContextThread for executing async code on non-threadpool thread, as intended. AsyncContext does not do this on its own.

8 Comments

You don't need async c => await c.GetResponseAsync(). You could write just AsyncContext.Run(Task.WhenAll(enumerable.Select(c =>c.GetResponseAsync())));. Although all this code is no better than Task.WaitAll(enumerable.Select(c =>c.GetResponseAsync()), assuming GetResponseAsync uses ConfigureAwait(false) internally
The linked question says use Dataflow and only mentions SemaphoreSlim as an alternative
The linked question and answer are specifically about prioritising continuations; the answer and its comments (as mentioned in this answer) advise that the best solution is throttling either with Dataflow or a semaphore. This answer uses a semaphore as it is far simpler and this problem doesn't need any of the features of Dataflow.
AsyncContext.Run takes a Func<Task> not a Task.
AFAIK the AsyncContext.Run uses the current thread as a context, and it doesn't care if the current thread is owned by the ThreadPool.
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.