Hi I am a student intern with little to no C# experience who got put into a situation to take over a windows service that uses TaskCompletionSource and BlockingCollection to implement multithreading. I have never done C#. I am trying to optimize how the service handles its task which is to crunch log files.
My question is, using BlockingCollection to create a thread queue which executes WorkiItem, how can you get the count of active threads in the queue? Meaning how many items that have been invoked by the EnqueueTask() command are still in running state? I don't want the count of the queue backlog which is what _taskQ.Count returns. I want count of active threads. I want to keep the thread count at four and only Enqueue an Item once a previous item is done. I don't want a blacklog of items in my queue.
public class ProducerConsumerQueue { public CancellationTokenSource Token { get; set; } private BlockingCollection<WorkItem> _taskQ; public ProducerConsumerQueue(int workerCount) { _taskQ = new BlockingCollection<WorkItem>(); for(int i = 0; i <workerCount; i++) { Task.Factory.StartNew(Consume); } } public Task EnqueueTask(Action action, CancellationToken? cancelToken) { var tcs = new TaskCompletionSource<object>(); _taskQ.Add(new WorkItem(tcs, action, cancelToken)); return tcs.Task; } public void Consume() { foreach (WorkItem workItem in _taskQ.GetConsumingEnumerable()) { if (workItem.CancelToken.HasValue && workItem.CancelToken.Value.IsCancellationRequested) { workItem.TaskSource.SetCanceled(); } else { try { workItem.Action(); workItem.TaskSource.SetResult(null); } catch (OperationCanceledException ex) { if (ex.CancellationToken == workItem.CancelToken) { workItem.TaskSource.SetCanceled(); } else { workItem.TaskSource.SetException(ex); } } catch (Exception ex) { workItem.TaskSource.SetException(ex); } } } } } This ProducerCOnsumer queue is called at service start and its queue is reloaded at each service polling interval. I want to limit how many of these threads are spawned by setting it to the limit of files that are in a thread safe db table. So if thread count is 4, the number of files in the db table will be 4. The queue shouldn't spawn additional threads or enqueue files into it's queue until 1 file is done. To do this, I figured a simple solution would be to count the number of active threads (which would mean active amount of files crunching and don't add any new files until the thread goes down by 1:
protected override void OnStart(string[] args) { ProducerConsumerQueue = new ProducerConsumerQueue(Constants.THREAD_COUNT); InitializeLogging(); PollOnServiceStart(); _timer.Elapsed += OnElapsedTime; _timer.Enabled = true; _timer.Interval = _interval; } public void OnElapsedTime(object source, ElapsedEventArgs args) { try { //InitializeLogging(); Poll(); } catch (Exception ex) { Logger.Error(ex.Message.ToString()); } } public void PollOnServiceStart() { foreach (var handler in handlers) { ProducerConsumerQueue.EnqueueTask(handler.Execute, CancellationTokenSource.Token); } }
workerCountvalue that is passed to theProducerConsumerQueueconstructor? And in spite of your protestations to the contrary, your question still includes statements like "I want to limit how many of these threads are spawned" which clearly indicate that you do want to limit the number of active threads, and that the duplicates are in fact what you need to read.Actions that have been invoked by theworkItem.Action();command, and are still in running state?I will use that number and set it in an if condition where if the count of active actions is less than thread count constant, then add an amount of file that is equal to the difference between the numbner of active actions and thread countSo you are manually trying to limit the number of threads? Don't do that. UseParallel.ForEachinstead. SetMaxDegreesOfParallelismto the number of threads you want to limit to.If nothing else it looks like I have some reading to do as well.Yes. Note also that that suggestion is directly covered in the duplicate that you claim isn't relevant.