7

I adopted my implementation of parallel/consumer based on the code in this question

class ParallelConsumer<T> : IDisposable { private readonly int _maxParallel; private readonly Action<T> _action; private readonly TaskFactory _factory = new TaskFactory(); private CancellationTokenSource _tokenSource; private readonly BlockingCollection<T> _entries = new BlockingCollection<T>(); private Task _task; public ParallelConsumer(int maxParallel, Action<T> action) { _maxParallel = maxParallel; _action = action; } public void Start() { try { _tokenSource = new CancellationTokenSource(); _task = _factory.StartNew( () => { Parallel.ForEach( _entries.GetConsumingEnumerable(), new ParallelOptions { MaxDegreeOfParallelism = _maxParallel, CancellationToken = _tokenSource.Token }, (item, loopState) => { Log("Taking" + item); if (!_tokenSource.IsCancellationRequested) { _action(item); Log("Finished" + item); } else { Log("Not Taking" + item); _entries.CompleteAdding(); loopState.Stop(); } }); }, _tokenSource.Token); } catch (OperationCanceledException oce) { System.Diagnostics.Debug.WriteLine(oce); } } private void Log(string message) { Console.WriteLine(message); } public void Stop() { Dispose(); } public void Enqueue(T entry) { Log("Enqueuing" + entry); _entries.Add(entry); } public void Dispose() { if (_task == null) { return; } _tokenSource.Cancel(); while (!_task.IsCanceled) { } _task.Dispose(); _tokenSource.Dispose(); _task = null; } } 

And here is a test code

class Program { static void Main(string[] args) { TestRepeatedEnqueue(100, 1); } private static void TestRepeatedEnqueue(int itemCount, int parallelCount) { bool[] flags = new bool[itemCount]; var consumer = new ParallelConsumer<int>(parallelCount, (i) => { flags[i] = true; } ); consumer.Start(); for (int i = 0; i < itemCount; i++) { consumer.Enqueue(i); } Thread.Sleep(1000); Debug.Assert(flags.All(b => b == true)); } } 

The test always fails - it always stuck at around 93th-item from the 100 tested. Any idea which part of my code caused this issue, and how to fix it?

2 Answers 2

9

You cannot use Parallel.Foreach() with BlockingCollection.GetConsumingEnumerable(), as you have discovered.

For an explanation, see this blog post:

https://devblogs.microsoft.com/pfxteam/parallelextensionsextras-tour-4-blockingcollectionextensions/

Excerpt from the blog:

BlockingCollection’s GetConsumingEnumerable implementation is using BlockingCollection’s internal synchronization which already supports multiple consumers concurrently, but ForEach doesn’t know that, and its enumerable-partitioning logic also needs to take a lock while accessing the enumerable.

As such, there’s more synchronization here than is actually necessary, resulting in a potentially non-negligable performance hit.

[Also] the partitioning algorithm employed by default by both Parallel.ForEach and PLINQ use chunking in order to minimize synchronization costs: rather than taking the lock once per element, it'll take the lock, grab a group of elements (a chunk), and then release the lock.

While this design can help with overall throughput, for scenarios that are focused more on low latency, that chunking can be prohibitive.

That blog also provides the source code for a method called GetConsumingPartitioner() which you can use to solve the problem.

public static class BlockingCollectionExtensions { public static Partitioner<T> GetConsumingPartitioner<T>(this BlockingCollection<T> collection) { return new BlockingCollectionPartitioner<T>(collection); } public class BlockingCollectionPartitioner<T> : Partitioner<T> { private BlockingCollection<T> _collection; internal BlockingCollectionPartitioner(BlockingCollection<T> collection) { if (collection == null) throw new ArgumentNullException("collection"); _collection = collection; } public override bool SupportsDynamicPartitions { get { return true; } } public override IList<IEnumerator<T>> GetPartitions(int partitionCount) { if (partitionCount < 1) throw new ArgumentOutOfRangeException("partitionCount"); var dynamicPartitioner = GetDynamicPartitions(); return Enumerable.Range(0, partitionCount).Select(_ => dynamicPartitioner.GetEnumerator()).ToArray(); } public override IEnumerable<T> GetDynamicPartitions() { return _collection.GetConsumingEnumerable(); } } } 
Sign up to request clarification or add additional context in comments.

2 Comments

Thanks. This solved my problem. Anyway, when I test further, the code in my OP does not fail when the number of item is a member of this sequence, A200672 e.g. 1, 2, 3, 5, 7, 9, 13, 17, 21, 29, 37, 45, 61, 77, 93, ... Any idea why? just curious.
@user69715 That's the sort of weird behaviour I found when I tried to do a similar thing. I guess it's just some weird interaction between Parallel.ForEach() and the underlying BlockingCollection, but I can't really explain it.
2

The reason for failure is because of the following reason as explained here

The partitioning algorithm employed by default by both Parallel.ForEach and PLINQ use chunking in order to minimize synchronization costs: rather than taking the lock once per element, it'll take the lock, grab a group of elements (a chunk), and then release the lock.

To get it to work, you can add a method on your ParallelConsumer<T> class to indicate that the adding is completed, as below

 public void StopAdding() { _entries.CompleteAdding(); } 

And now call this method after your for loop , as below

 consumer.Start(); for (int i = 0; i < itemCount; i++) { consumer.Enqueue(i); } consumer.StopAdding(); 

Otherwise, Parallel.ForEach() would wait for the threshold to be reached so as to grab the chunk and start processing.

2 Comments

the thing is in the production, the tasks get queued continuosly, so marking "StopAdding" does not help. Thanks for your answer, +1, but I'll go with the other answer.
Oops, looks like I can't +1 yet

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.