24

Why Parallel.ForEach loop exits with OperationCancelledException, while using GetConsumableEnumerable?

//outside the function static BlockingCollection<double> _collection = new BlockingCollection<double>(); var t = Task.Factory.StartNew(Producer); Parallel.ForEach(_collection.GetConsumingEnumerable(), item => Console.WriteLine("Processed {0}", item)); Console.WriteLine("FINISHED processing"); public static void Producer() { var data = Enumerable.Range(1, 1000); foreach (var i in data) { _collection.Add(i); Console.WriteLine("Added {0}",i); } Console.WriteLine("Finished adding"); _collection.CompleteAdding(); } 
1
  • I can't reproduce the OperationCancelledException behavior on .NET 7. The code in the question runs successfully to completion, no exception is thrown. Commented Nov 24, 2022 at 9:36

2 Answers 2

26

Using Parallel.ForEach with BlockingCollection is somewhat problematic, as I found out recently. It can be made to work, but it needs a little extra effort.

Stephen Toub has an excellent blog post on it, and if you download the "Parallel Extension Extras" project (also available on NuGet) you'll find some code ready to help you.

Sign up to request clarification or add additional context in comments.

5 Comments

What really puzzles me is why Parallel.ForEach throws exception when I call _collection.CompleteAdding().
@Sam: I wouldn't like to say, to be honest. There's too much deep magic going on there for me to have any confidence in saying the right thing :)
The current URL to the Parallel Extensions Extras: code.msdn.microsoft.com/ParExtSamples and someone has made a NuGet of the extensions: nuget.org/packages/MSFT.ParallelExtensionsExtras
A more recent article on this combination by Can Bilgin - link
3

Using the Parallel.ForEach with a BlockingCollection<T> as source, requires two specific adjustments:

  1. Use the EnumerablePartitionerOptions.NoBuffering option.
  2. Specify the MaxDegreeOfParallelism to a value other than -1.

Example of correct usage:

Partitioner<Item> partitioner = Partitioner.Create(collection.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering); ParallelOptions options = new() { MaxDegreeOfParallelism = Environment.ProcessorCount }; Parallel.ForEach(partitioner, options, item => { //... }); 

Explanation:

  1. The EnumerablePartitionerOptions.NoBuffering is required because otherwise the Parallel.ForEach will not process immediately each consumed item. Instead it will put the item in a small buffer, and will wait until the buffer reaches an arbitrary size before processing all the items in the buffer. This behavior introduces undesirable latency, and can even cause deadlocks in some advanced scenarios.
  2. Configuring the MaxDegreeOfParallelism is required in order to keep the ThreadPool usage under control. Otherwise the Parallel.ForEach will keep asking for more and more threads, prompting the ThreadPool to create new threads at a rate of one new thread per second, even while the BlockingCollection<T> is completely empty and the parallel loop is idle! For an experimental demonstration of this strange behavior, see this question.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.