The code realizes producer/consumer problem with multiple producers and consumers. Have this code any potential deadlock or races?
//RandomDataProvider.cs namespace MyNamespace.Core { using System; /// <summary> /// Provides randomly generated data. /// </summary> public class RandomDataProvider { #region constuction /// <summary> /// Initializes new instance of the <see cref="RandomDataProvider"/> class. /// </summary> /// <param name="maxSleepInterval"></param> public RandomDataProvider(int maxSleepInterval) { _maxSleepInterval = maxSleepInterval; } #endregion // constuction #region implementation /// <summary> /// Gets the random sleeping interval. /// </summary> public int GetSleepInterval() { lock (_locker) { return _random.Next(_maxSleepInterval); } } /// <summary> /// Gets the random value. /// </summary> public int GetRandomValue() { lock (_locker) { return _random.Next(1, 100); } } #endregion // implementation #region representation private readonly Random _random = new Random(); private readonly object _locker = new object(); private readonly int _maxSleepInterval; #endregion // representation } } //Producer.cs namespace MyNamespace.Core { using System.Collections.Generic; using System.Threading; /// <summary> /// Reprecents <see cref="Producer"/> class. /// </summary> public sealed class Producer : RandomDataProvider { #region construction /// <summary> /// Initializes the new instance of <see cref="Producer"/> class. /// </summary> /// <param name="queue">Queue to which producers put items.</param> /// <param name="maxSleepInterval">The maximum sleep interval.</param> public Producer(Queue<int> queue, int maxSleepInterval) : base(maxSleepInterval) { _queue = queue; } #endregion // construction #region implementation /// <summary> /// Starts produce items and put to queue. /// </summary> /// <param name="token">Cancellation token.</param> public void StartProduce(CancellationToken token) { while (true) { // If canceled stop producing. if (token.IsCancellationRequested) { return; } Thread.Sleep(GetSleepInterval()); int item = GetRandomValue(); lock (_queue) { if (_isReachedMax) { // if producer reached max items, block and wait until count decreases 80. while (_queue.Count > 80) { Monitor.Wait(_queue); } // now we can release thread _isReachedMax = false; } _queue.Enqueue(item); if (_queue.Count == 1) { // Pulse comsumres which waits on empty queue Monitor.Pulse(_queue); } else if (_queue.Count >= 100) { // Set reached max count flag. _isReachedMax = true; } } } } #endregion // implementation #region representation private readonly Queue<int> _queue; private bool _isReachedMax; #endregion // representation } } //Consumer.cs namespace MyNamespace.Core { using System; using System.Collections.Generic; using System.IO; using System.Threading; /// <summary> /// Reprecents <see cref="Consumer"/> class. /// </summary> public sealed class Consumer : RandomDataProvider, IDisposable { /// <summary> /// Initializes the new instance of <see cref="Consumer"/> class. /// </summary> /// <param name="queue">The queue from which consumes items.</param> /// <param name="maxSleepInterval">The maximum sleep interval.</param> public Consumer(Queue<int> queue, int maxSleepInterval) :base(maxSleepInterval) { _queue = queue; } #region implementation #region Implementation of IDisposable /// <summary> /// Disposes the current instance. /// </summary> public void Dispose() { StreamWriter.Close(); } #endregion /// <summary> /// Starts consume items from queue. /// </summary> /// <param name="token">Cancellation token.</param> public void StartConsume(CancellationToken token) { while (true) { Thread.Sleep(GetSleepInterval()); int item; lock (_queue) { // If no item in queue after cancel, return. if (_queue.Count == 0 && token.IsCancellationRequested) { return; } // Wait when queue empty. while (_queue.Count == 0) { Monitor.Wait(_queue); } item = _queue.Dequeue(); // if count decreases and reaches 80, then inform produsers which wait. if (_queue.Count <= 80) { Monitor.PulseAll(_queue); } } WriteToStream(item); } } #endregion // implementation #region operations /// <summary> /// Writes items to the stream. /// </summary> /// <param name="item">Item to write.</param> private void WriteToStream(int item) { lock (StreamWriter) { StreamWriter.Write("{0},", item); } } #endregion // operations #region representation private readonly Queue<int> _queue; private static readonly StreamWriter StreamWriter = new StreamWriter(FILE_PATH); #endregion // representation #region constants private const string FILE_PATH = "data.txt"; #endregion // constants } } //Controller.cs namespace Root { using System; using System.Collections.Generic; using System.Threading; using System.Timers; using MyNamespace.Core; /// <summary> /// Provides functionality that controls <see cref="Producer"/> and <see cref="Consumer"/> concurency. /// </summary> public class Controller : IDisposable { #region construction /// <summary> /// Initializes the new instance of <see cref="Controller"/> class. /// </summary> /// <param name="queue">Shared Queue object.</param> public Controller(Queue<int> queue) { _queue = queue; _timer = new System.Timers.Timer(1000); _timer.Elapsed += ShowCount; } #endregion // construction #region implementation #region Implementation of IDisposable /// <summary> /// Disposes the current instance. /// </summary> public void Dispose() { foreach (var consumer in _consumers) { consumer.Dispose(); } } #endregion /// <summary> /// Creates consumer/producer objects and ran them each in seperate thread. /// </summary> public void Start() { _consumerThreads = new Thread[_consumerCount]; _producerThreads = new Thread[_producerCount]; _consumers = new Consumer[_consumerCount]; _producers = new Producer[_producerCount]; for (int i = 0; i < ProducerCount; ++i) { var producer = new Producer(_queue, _maxSleepInterval); _producers[i] = producer; _producerThreads[i] = new Thread(() => producer.StartProduce(_source.Token)); _producerThreads[i].Start(); } for (int i = 0; i < ConsumerCount; ++i) { var consumer = new Consumer(_queue, _maxSleepInterval); _consumers[i] = consumer; _consumerThreads[i] = new Thread(() => consumer.StartConsume(_source.Token)); _consumerThreads[i].Start(); } _timer.Start(); } /// <summary> /// Cancels producing/consuming. /// </summary> public void Cancel() { _source.Cancel(); _timer.Stop(); for (int i = 0; i < ConsumerCount; ++i) { _consumerThreads[i].Join(); } } #endregion // implementation #region properties /// <summary> /// Gets or sets consumers count. /// </summary> public int ConsumerCount { get { return _consumerCount; } set { _consumerCount = value; } } /// <summary> /// Gets or sets producers count. /// </summary> public int ProducerCount { get { return _producerCount; } set { _producerCount = value; } } /// <summary> /// Gets or sets maximum sleep interval. /// </summary> public int MaxSleepInterval { get { return _maxSleepInterval; } set { _maxSleepInterval = value; } } #endregion // properties #region operations /// <summary> /// Shows the queue size. /// </summary> /// <param name="sender">Sender object.</param> /// <param name="e">Event handler argumnets.</param> private void ShowCount(object sender, ElapsedEventArgs e) { int count; lock (_queue) { count = _queue.Count; } Console.WriteLine("Items in the queue: {0}", count); } #endregion // operations #region representation /// <summary> /// Shared between produsers and consumers queue object. /// </summary> private readonly Queue<int> _queue; /// <summary> /// Thread in which runs each consumer. /// </summary> private Thread[] _consumerThreads; /// <summary> /// Thread in which runs each producer. /// </summary> private Thread[] _producerThreads; /// <summary> /// Array of <see cref="Consumer"/> objects. /// </summary> private Consumer[] _consumers; /// <summary> /// Array of <see cref="Producer"/> objects. /// </summary> private Producer[] _producers; /// <summary> /// Timer object. /// </summary> private readonly System.Timers.Timer _timer; /// <summary> /// Provides cancellation token. /// </summary> private readonly CancellationTokenSource _source = new CancellationTokenSource(); private int _consumerCount = 10; private int _producerCount = 10; private int _maxSleepInterval = 100; #endregion // representation } } At the end
//Program.cs namespace Root { using System; using System.Collections.Generic; class Program { private static void Main(string[] args) { int prdCnt; int conCnt; Console.Write("Enter produsers count[1-10]:"); while (!int.TryParse(Console.ReadLine(), out prdCnt) || prdCnt < 1 || prdCnt > 10) { Console.WriteLine("Error"); Console.Write("Enter produsers count[1-10]:"); } Console.Write("Enter consumers count[1-10]:"); while (!int.TryParse(Console.ReadLine(), out conCnt) || conCnt < 1 || conCnt > 10) { Console.WriteLine("Error"); Console.Write("Enter consumers count[1-10]:"); } Queue<int> queue = new Queue<int>(); using (var controller = new Controller(queue)) { controller.ConsumerCount = conCnt; controller.ProducerCount = prdCnt; controller.Start(); Console.ReadLine(); controller.Cancel(); } } } }
System.Collections.Concurrentnot allowed. \$\endgroup\$CancellationTokenwould be allowed but notBlockingCollection... \$\endgroup\$