9

The following should return "C", but it returns "B"

using System.Data.Entity; //... var state = "A"; var qry = (from f in db.myTable select f); await qry.ForEachAsync(async (myRecord) => { await DoStuffAsync(myRecord); state = "B"; }); state = "C"; return state; 

It doesn't wait for DoStuffAsync to complete, state="C" runs through and then later state="B" executes (because inside it is still awaiting).

1
  • So it's kind of logically equivalent to foreach (var x in await qty.ToArrayAsync()) { ... } I guess? Commented Feb 12, 2019 at 7:58

3 Answers 3

14

That's because the implementation of ForEachAsync doesn't await the delegated action

moveNextTask = enumerator.MoveNextAsync(cancellationToken); action(current); 

see https://github.com/mono/entityframework/blob/master/src/EntityFramework/Infrastructure/IDbAsyncEnumerableExtensions.cs#L19

But that is because, you can't await an action, the delegate needs to be a Func which returns a Task - see How do you implement an async action delegate method?

Therefore, until Microsoft provides a signature which includes a Func delegate and calls it with await, you'll have to roll your own extension method. I'm using the following at the moment.

public static async Task ForEachAsync<T>( this IQueryable<T> enumerable, Func<T, Task> action, CancellationToken cancellationToken) //Now with Func returning Task { var asyncEnumerable = (IDbAsyncEnumerable<T>)enumerable; using (var enumerator = asyncEnumerable.GetAsyncEnumerator()) { if (await enumerator.MoveNextAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false)) { Task<bool> moveNextTask; do { var current = enumerator.Current; moveNextTask = enumerator.MoveNextAsync(cancellationToken); await action(current); //now with await } while (await moveNextTask.ConfigureAwait(continueOnCapturedContext: false)); } } } 

With this, the original test code in your OP will work as expected.

Sign up to request clarification or add additional context in comments.

1 Comment

I'm not sure how well your own ForEachAsync will go alongside the Action version. I simply removed the using System.Data.Entities; and had my own namespace.
1

.Net 6 Introduced Parallel.ForEachAsync(...):

var state = "A"; var qry = (from f in db.myTable select f); await Parallel.ForEachAsync(qry, async (myRecord, token) => { await DoStuffAsync(myRecord); state = "B"; }); state = "C"; return state; 

You can set a MaxDegreeOfParallelism property with a ParallelOptions parameter, but usually you don't need it.

From Microsoft Docs: https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.paralleloptions.maxdegreeofparallelism?view=net-6.0

By default, For and ForEach will utilize however many threads the underlying scheduler provides, so changing MaxDegreeOfParallelism from the default only limits how many concurrent tasks will be used.

Generally, you do not need to modify this setting....

2 Comments

Regarding the "Generally, you do not need to modify this setting" advice, you might find this GitHub issue interesting. I am not sure that it applies to Parallel.ForEachAsync. If you don't specify the MaxDegreeOfParallelism, the default is Environment.ProcessorCount, which is quite arbitrary for an asynchronous operation.
Async isn't about Parallelism. It's about decoupling Tasks from Threads. Most commonly, I use Async for more efficient IO, so that the thread isn't blocked until an IO result returns. So in this case, your answer is valid, but it doesn't minimally solve the problem, it also distributes the tasks to multiple CPU cores. If the Async Tasks MUST be run sequentially, this answer won't work. (Also I have a feeling this Parallel ForEach API has been around for a while)
0

Since DbSet implements IAsyncEnumerable, consider using the following extension method:

public async static Task ForEachAsync<T>(this IAsyncEnumerable<T> source, Func<T, Task> action, CancellationToken cancellationToken = default) { if (source == null) return; await foreach (T item in source.WithCancellation(cancellationToken).ConfigureAwait(false)) { await action(item); } } 

Usage:

var qry = (from f in db.myTable select f); await qry .AsAsyncEnumerable() .ForEachAsync(async arg => { await DoStuffAsync(arg); }); 

6 Comments

Your implementation is similar to the ForEachAwaitAsync operator of the System.Linq.Async library. Todd's solution is more interesting IMHO, because it allows for concurrency in a way that it's unlikely to create problems, while improving the performance at the same time.
Also the ConfigureAwait(false) means that the action will not be invoked on the current SynchronizationContext. So for example if the project's type is WinForms and the action contains UI-related code, the ForEachAsync method will fail.
@Theodor Zoulias thanks for your point, indeed, consider using ConfigureAwait(true) in UI applications (when you need need a synchronization context), otherwise you should always use ConfigureAwait(false), (looking at @Todd's answer I could see that he used ConfigureAwait(continueOnCapturedContext: false) too
@Theodor Zoulias "allows for concurrency in a way that it's unlikely to create problems" it might be helpful (for me and others) to explain it.
Todd's solution allows each action on an item to occur concurrently with fetching the sequence's next item. These two concurrent operations are unlikely to depend on each other (by sharing state that needs to be synchronized), because they are quite distinct operations. Compare this to executing two actions for two different elements concurrently. This has a higher chance to create problems, because the concurrent operations are homogeneous, and may depend on some non-thread-safe shared state (a DBConnection for example).
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.