-1

Hi I am a student intern with little to no C# experience who got put into a situation to take over a windows service that uses TaskCompletionSource and BlockingCollection to implement multithreading. I have never done C#. I am trying to optimize how the service handles its task which is to crunch log files.

My question is, using BlockingCollection to create a thread queue which executes WorkiItem, how can you get the count of active threads in the queue? Meaning how many items that have been invoked by the EnqueueTask() command are still in running state? I don't want the count of the queue backlog which is what _taskQ.Count returns. I want count of active threads. I want to keep the thread count at four and only Enqueue an Item once a previous item is done. I don't want a blacklog of items in my queue.

public class ProducerConsumerQueue { public CancellationTokenSource Token { get; set; } private BlockingCollection<WorkItem> _taskQ; public ProducerConsumerQueue(int workerCount) { _taskQ = new BlockingCollection<WorkItem>(); for(int i = 0; i <workerCount; i++) { Task.Factory.StartNew(Consume); } } public Task EnqueueTask(Action action, CancellationToken? cancelToken) { var tcs = new TaskCompletionSource<object>(); _taskQ.Add(new WorkItem(tcs, action, cancelToken)); return tcs.Task; } public void Consume() { foreach (WorkItem workItem in _taskQ.GetConsumingEnumerable()) { if (workItem.CancelToken.HasValue && workItem.CancelToken.Value.IsCancellationRequested) { workItem.TaskSource.SetCanceled(); } else { try { workItem.Action(); workItem.TaskSource.SetResult(null); } catch (OperationCanceledException ex) { if (ex.CancellationToken == workItem.CancelToken) { workItem.TaskSource.SetCanceled(); } else { workItem.TaskSource.SetException(ex); } } catch (Exception ex) { workItem.TaskSource.SetException(ex); } } } } } 

This ProducerCOnsumer queue is called at service start and its queue is reloaded at each service polling interval. I want to limit how many of these threads are spawned by setting it to the limit of files that are in a thread safe db table. So if thread count is 4, the number of files in the db table will be 4. The queue shouldn't spawn additional threads or enqueue files into it's queue until 1 file is done. To do this, I figured a simple solution would be to count the number of active threads (which would mean active amount of files crunching and don't add any new files until the thread goes down by 1:

protected override void OnStart(string[] args) { ProducerConsumerQueue = new ProducerConsumerQueue(Constants.THREAD_COUNT); InitializeLogging(); PollOnServiceStart(); _timer.Elapsed += OnElapsedTime; _timer.Enabled = true; _timer.Interval = _interval; } public void OnElapsedTime(object source, ElapsedEventArgs args) { try { //InitializeLogging(); Poll(); } catch (Exception ex) { Logger.Error(ex.Message.ToString()); } } public void PollOnServiceStart() { foreach (var handler in handlers) { ProducerConsumerQueue.EnqueueTask(handler.Execute, CancellationTokenSource.Token); } } 
23
  • 1
    While you say you "want to get the active number of threads", it's clear from what you wrote, and especially in your recent edit, that what you really want is to limit the number of active threads. See XY Problem. Commented Feb 5, 2021 at 23:31
  • 1
    Your post reads: "I want count of active threads". Ignoring for a moment the question of why you should care about that, how is the "count of active threads", not the same as the workerCount value that is passed to the ProducerConsumerQueue constructor? And in spite of your protestations to the contrary, your question still includes statements like "I want to limit how many of these threads are spawned" which clearly indicate that you do want to limit the number of active threads, and that the duplicates are in fact what you need to read. Commented Feb 6, 2021 at 0:35
  • 1
    When you say that you want the count of active threads in the queue, you mean the number of the Actions that have been invoked by the workItem.Action(); command, and are still in running state? Commented Feb 6, 2021 at 0:42
  • 2
    I will use that number and set it in an if condition where if the count of active actions is less than thread count constant, then add an amount of file that is equal to the difference between the numbner of active actions and thread count So you are manually trying to limit the number of threads? Don't do that. Use Parallel.ForEach instead. Set MaxDegreesOfParallelism to the number of threads you want to limit to. Commented Feb 6, 2021 at 1:06
  • 1
    You are frustrated, I get that. But I suggested you look at that over an hour ago. So you can see how we might be a little frustrated also. If nothing else it looks like I have some reading to do as well. Yes. Note also that that suggestion is directly covered in the duplicate that you claim isn't relevant. Commented Feb 6, 2021 at 1:11

1 Answer 1

2

If you want a thread-safe counter to check the number of active tasks you can use an int _counter field and Interlocked.Increment(ref _counter)/Interlocked.Decrement(ref _counter).

Remember to increment as the first line in a try block decrement in a finally block so that you don't lose any calls to either if an exception is raised.

https://learn.microsoft.com/en-us/dotnet/api/system.threading.interlocked.increment?view=net-5.0

https://learn.microsoft.com/en-us/dotnet/api/system.threading.interlocked.decrement?view=net-5.0

Sign up to request clarification or add additional context in comments.

10 Comments

Oh God I don't know anything about this. I'll have to read into ti. Why would I increment and decrement?
Increment when you start processing a task, decrement when you finish. The count at any point in time will be the number of active tasks. That's what you ask for in the question.
This will go in this class yeah? A new property for the class?
A field of the class, yes.
I figured it out! Thanks for your help. I am now testing it to see how it works. Your idea if it works after thorough testing makes my life much easier as I dont have to rewrite the threading logic!
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.