0

With the new standards ofc++17 I wonder if there is a good way to start a process with a fixed number of threads until a batch of jobs are finished.

Can you tell me how I can achieve the desired functionality of this code:

std::vector<std::future<std::string>> futureStore; const int batchSize = 1000; const int maxNumParallelThreads = 10; int threadsTerminated = 0; while(threadsTerminated < batchSize) { const int& threadsRunning = futureStore.size(); while(threadsRunning < maxNumParallelThreads) { futureStore.emplace_back(std::async(someFunction)); } for(std::future<std::string>& readyFuture: std::when_any(futureStore.begin(), futureStore.end())) { auto retVal = readyFuture.get(); // (possibly do something with the ret val) threadsTerminated++; } } 

I read, that there used to be an std::when_any function, but it was a feature that did make it getting into the std features.

Is there any support for this functionality (not necessarily for std::future-s) in the current standard libraries? Is there a way to easily implement it, or do I have to resolve to something like this?

1
  • Why do you want a fixed number of threads? Can't you just use std::async and rely on it? Commented Jun 13, 2017 at 9:19

2 Answers 2

2

This does not seem to me to be the ideal approach:

  1. All your main thread does is waiting for your other threads finishing, polling the results of your future. Almost wasting this thread somehow...

  2. I don't know in how far std::async re-uses the threads' infrastructures in any suitable way, so you risk creating entirely new threads each time... (apart from that you might not create any threads at all, see here, if you do not specify std::launch::async explicitly.

I personally would prefer another approach:

  1. Create all the threads you want to use at once.
  2. Let each thread run a loop, repeatedly calling someFunction(), until you have reached the number of desired tasks.

The implementation might look similar to this example:

const int BatchSize = 20; int tasksStarted = 0; std::mutex mutex; std::vector<std::string> results; std::string someFunction() { puts("worker started"); fflush(stdout); sleep(2); puts("worker done"); fflush(stdout); return ""; } void runner() { { std::lock_guard<std::mutex> lk(mutex); if(tasksStarted >= BatchSize) return; ++tasksStarted; } for(;;) { std::string s = someFunction(); { std::lock_guard<std::mutex> lk(mutex); results.push_back(s); if(tasksStarted >= BatchSize) break; ++tasksStarted; } } } int main(int argc, char* argv[]) { const int MaxNumParallelThreads = 4; std::thread threads[MaxNumParallelThreads - 1]; // main thread is one, too! for(int i = 0; i < MaxNumParallelThreads - 1; ++i) { threads[i] = std::thread(&runner); } runner(); for(int i = 0; i < MaxNumParallelThreads - 1; ++i) { threads[i].join(); } // use results... return 0; } 

This way, you do not recreate each thread newly, but just continue until all tasks are done.

If these tasks are not all all alike as in above example, you might create a base class Task with a pure virtual function (e. g. "execute" or "operator ()") and create subclasses with the implementation required (and holding any necessary data).

You could then place the instances into a std::vector or std::list (well, we won't iterate, list might be appropriate here...) as pointers (otherwise, you get type erasure!) and let each thread remove one of the tasks when it has finished its previous one (do not forget to protect against race conditions!) and execute it. As soon as no more tasks are left, return...

Sign up to request clarification or add additional context in comments.

11 Comments

The default behaviour of std::async is std::launch | std::deferred and will most likely use some sort of threadpool. It is true, that there is no guarantee, that new threads will be spawned, but that is intentional, because thread creation might not be possible. In this case, the code will still execute correctly, instead of throwing an exception. After all i think, the std::async solution is way more elegant.
Perhaps a matter of taste. In this specific case, I don't consider it more elegant for one specific reason: There is no condition variable available to be signalled from multiple threads waking up a single one (as discussed here)... So you need to fall back into polling in the main thread...
Wouldn't this wait for the first launched thread instead of any that is finished? I have threads that have very inpredictable execution times, and this would wait for the slowest of them. Polling the threads is even more effective than doing this.
@AdamHunyadi What exactly do you mean by 'this'? Joining? That will only occur when no tasks are left any more and makes sure that we do not proceed until all tasks are finished. Or the condition variable I mentioned? Well, problem is: there isn't any suitable for our use case! We have one allowing a single thread waking up multiple others, but none allowing any arbitrary thread waking up a specific one... (at least not in standard c++).
@Aconcagua Say I have got a job for the threads that either takes 10 ms to compute, or I'll let it timeout after about 1000 miliseconds, and return with an object suggesting that no answer was found. By having a single one of such a threads when I'm joining the threads in a fixed order (ie. not the ones that are finished first), I'm losing most of the performance boost I get by having the threads, if there is one of the threads in the thread store, that take long to execute.
|
1

If you dont care about the exact number of threads, the simplest solution would be:

std::vector<std::future<std::string>> futureStore( batchSize ); std::generate(futureStore.begin(), futureStore.end(), [](){return std::async(someTask);}); for(auto& future : futureStore) { std::string value = future.get(); doWork(value); } 

From my experience, std::async will reuse the threads, after a certain amount of threads is spawend. It will not spawn 1000 threads. Also, you will not gain much of a performance boost (if any), when using a threadpool. I did measurements in the past, and the overall runtime was nearly identical.

The only reason, I use threadpools now, is to avoid the delay for creating threads in the computation loop. If you have timing constraints, you may miss deadlines, when using std::async for the first time, since it will create the threads on the first calls.

There is a good thread pool library for these applications. Have a look here: https://github.com/vit-vit/ctpl

#include <ctpl.h> const unsigned int numberOfThreads = 10; const unsigned int batchSize = 1000; ctpl::thread_pool pool(batchSize /* two threads in the pool */); std::vector<std::future<std::string>> futureStore( batchSize ); std::generate(futureStore.begin(), futureStore.end(), [](){ return pool.push(someTask);}); for(auto& future : futureStore) { std::string value = future.get(); doWork(value); } 

6 Comments

Yes, as the title of the question says, I want to maximize the number of threads. I am working on a supercomputer with many-many cores, but I'm not the only user, so I should have a limit on the threads I spawn.
If you want to maximize the number of threads for performance reasons, than you don't need to care about it too much, since it will not become faster of you have more threads, than hardware threads, and std::async will spawn threads up to this point. If you want more threads for other reasons, than cptl is the way to go. I will add an example for you.
I don't want to utilize all the cores, since I am not the only user of the supercomputer I'm working on. I need to share the resources with other users.
On most supercomputers, you have a job submission system, that will restrict the amount of ressources, you can use. If you want to limit your cores, then you will do this in the submission of you job, and you programm will be forced, to only use that many cores.
@NicolBolas you are right. I should have used std::generate instead. I will fix the errer as soon as possible
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.