4

I'm using the Parallel.ForEach loop to do some work and I initialize it with the localInit like this:

localInit: () => new { foo = new Foo(), bars = CreateBars(), } 

According to the documentation:

localInit, or the function that initializes the thread-local variable. This function is called once for each partition in which the Parallel.ForEach<TSource> operation executes. Our example initializes the thread-local variable to zero.

So I tried to use it like that but I observed that the loop is constantly killing and creating new tasks which results in frequent calls to localInit. This I my option is counterproductive and doesn't work as desired.

I thought when the Parallel.ForEach would create for example four partitions it would keep them alive until it itereated over all items but it doesn't. It's calling localFinally and localInit several hundered times for a collection with a few thousend items. How so?

Can this behavior somehow be prevented? I was really hoping to save some resources but it doesn't really let me.


Here's how the loop looks like:

var parallelLoopResult = Parallel.ForEach ( source: items, parallelOptions: parallelOptions, localInit: () => new { foo = new Foo(), bars = CreateBars(), }, body: (item, loopState, i, local) => { parallelOptions.CancellationToken.ThrowIfCancellationRequested(); var results = local.bars.Select(x => ...).ToList(). .... return local; }, localFinally: local => { local.foo.Dispose(); lock (aggregateLock) { ... process transformed bars } } ); 

ParallelOptions:

var parallelOptions = new ParallelOptions { CancellationToken = cancellationTokenSource.Token, #if DEBUG MaxDegreeOfParallelism = 1 //MaxDegreeOfParallelism = Environment.ProcessorCount #else MaxDegreeOfParallelism = Environment.ProcessorCount #endif }; 
2
  • What is the source (IEnumerable<T> or Partitioner<T>)? What are your ParallelOptions? Commented Feb 29, 2016 at 12:52
  • @svick items are just strings (like keys in a database). ParallelOptions just specify the MaxDegreeOfParallelism (Environment.ProcessorCount) and a CancellationToken. Commented Feb 29, 2016 at 12:54

4 Answers 4

2

If I understand the code correctly, Parallel.ForEach() restarts each Task every few hundred milliseconds. This means that if each iteration is substantial (as it generally should be), you will get lots of Tasks and thus lots of calls to localInit and localFinally. The reason for this is fairness with regards to other code in the same process that also uses the same ThreadPool.

I don't think there is a way to change this behavior of Parallel.ForEach(). I think a good way to solve this is to write your own simple version of Parallel.ForEach(). Considering that you can take advantage of Partitioner<T> and depending on what features of Parallel.ForEach() you need, it could be relatively simple. For example, something like:

public static void MyParallelForEach<TSource, TLocal>( IEnumerable<TSource> source, int degreeOfParallelism, Func<TLocal> localInit, Func<TSource, TLocal, TLocal> body, Action<TLocal> localFinally) { var partitionerSource = Partitioner.Create(source).GetDynamicPartitions(); Action taskAction = () => { var localState = localInit(); foreach (var item in partitionerSource) { localState = body(item, localState); } localFinally(localState); }; var tasks = new Task[degreeOfParallelism - 1]; for (int i = 0; i < degreeOfParallelism - 1; i++) { tasks[i] = Task.Run(taskAction); } taskAction(); Task.WaitAll(tasks); } 
Sign up to request clarification or add additional context in comments.

3 Comments

Very nice. Thank you. I'll try to implement it in my project. It looks promising. I'll need a try/catch here and there but I think I get the idea. Give me some time before I hit accept ;-)
Okay it wasn't that hard after all ;-) The performance of the loop doubled and nearly even trippled with this solution :-o amazing. It's a pitty that the standard ForEach has such a draw back.
I've just added a try/finally around the body and localFinally so it doesn't get lost when something bad happens. I understand the fairness but there should definitely be an option like max-performance no matter what so that we can rely on the localInit be called only once per partytion.
1

This overload isn't the only one, so you can try this:

var bars = CreateBars(); Parallel.Foreach(bars, b => { /* your action here */}; 

But if you really want to create a copy of bars for each thread, you can use some copy method from LINQ (assuming your bars is a IEnumerable<T> variable):

var bars = CreateBars(); localInit: () => new { foo = new Foo(), bars = new List<IBar>(bars), } 

3 Comments

Unfortunately this won't work. Bars are a kind transformations that I need to apply to each item therefore I tried to create those Bars for each thread so that I don't have to use locks. Each thread should have its own instances but somehow I see in my logs and also when I debug the code that it creates it over and over agian and the thead ids are also always the same. I've added the entire loop to the question.
Oh, I like this bars = new List<IBar>(bars), I'll try it... why didn't I come to this idea it seem to be so obvious ;-)
I think I know why this happens. The loop is constantly killing and creating new task/threads because the localFinally is also being called very often without the loop being completely iterated. I probably have to investigate the TaskScheduler, perhaps this can prevent it from happening.
1

There is no Parallel.ForEach overload that offers this functionality. If you want to set an upper limit to the invocations of the localInit delegate, you have to code it manually. Svick's MyParallelForEach implementation is simple and gets the job done, but it doesn't have all the features of the native Parallel.ForEach. This answer is an attempt for a fully featured implementation.

The Parallel_ForEach shown below has identical signature with the native Parallel.ForEach, and almost identical features and behavior. The main difference is the limit on how many TLocal instances may be created. The localInit delegate is invoked at most MaxDegreeOfParallelism times, guaranteed.

public static ParallelLoopResult Parallel_ForEach<TSource, TLocal>( IEnumerable<TSource> source, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<TSource, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally) { ArgumentNullException.ThrowIfNull(source); ArgumentNullException.ThrowIfNull(parallelOptions); ArgumentNullException.ThrowIfNull(localInit); ArgumentNullException.ThrowIfNull(body); ArgumentNullException.ThrowIfNull(localFinally); // Create a copy of the parallelOptions parallelOptions = new() { MaxDegreeOfParallelism = parallelOptions.MaxDegreeOfParallelism, CancellationToken = parallelOptions.CancellationToken, TaskScheduler = parallelOptions.TaskScheduler, }; if (parallelOptions.MaxDegreeOfParallelism == -1) parallelOptions.MaxDegreeOfParallelism = Environment.ProcessorCount; OrderablePartitioner<TSource> partitioner = Partitioner.Create(source); IList<IEnumerator<TSource>> enumerators = partitioner.GetPartitions( parallelOptions.MaxDegreeOfParallelism); return Parallel.ForEach(enumerators, parallelOptions, (e, state) => { (TLocal Value, bool HasValue) localData = default; try { using (e) { while (!state.ShouldExitCurrentIteration && e.MoveNext()) { TSource item = e.Current; if (!localData.HasValue) localData = (localInit(), true); localData.Value = body(item, state, localData.Value); } } } finally { if (localData.HasValue) localFinally(localData.Value); } }); } 

Two features are missing:

  1. The MaxDegreeOfParallelism == -1 configuration (unlimited parallelism) is not supported. If you don't specify the MaxDegreeOfParallelism, or if you give it the value -1, the maximum degree of parallelism will be equal to the number of CPU cores (Environment.ProcessorCount).
  2. The ParallelLoopState.Break functionality is broken. You may use the ParallelLoopState.Stop to stop the parallel loop, but if you use the Break it won't work correctly, and eventually the ParallelLoopResult.LowestBreakIteration will be wrong too.

Comments

0

Only once per thread execution bars are created. But do you know how many parallel executions are done? It is at the discretion of Parallel Execution Engine to start as many parallel executions as it likes.

If you want to limit parallel execution, use MaxDegreeOfParallelism property. This will put an upper limit on how many bars will be created at one time. It will still not control total bars created and also total bars may be less than what you'd expect now.

If you want to have explicit control, create tasks manually.

6 Comments

I've tested it with 1, 2, 4 and 8 MaxDegreeOfParallelism. The more I use the more often it recreates the tasks. I've edited my question, maybe it's more clear now.
@t3chb0t: Yes... you see with your code the number of parallel executions that will be started is at the discretion of Parallel Execution Engine. You are wrongly expecting that you can know beforehand how many bars will be created. The number of bars will now depend on what the load balancer feels. If it feels that it can have more parallel executions, you will have more bars. If it feels that load is too much, you will see fewer bars created.
Oh, does that mean that the number of tasks can fluctuate during runtime and this could be the cause for the frequent inits? I thought it would keep them alive. Mhmm... is there any way to prevent this behavior? It's quite disturbing I have to admit.
@t3chb0t: You can only put an upper limit on the number of simultaneous parallel execution by using the MaxDegreeOfParallelism property. AFAIK, this is the max control you can have.
@displayName "Only once per thread bars are created." This is not true, as the question clearly demonstrates. Even with MaxDegreeOfParallelism set to ProcessorCount, hundreds of calls to localInit are done, while using at most ProcessorCount threads.
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.