0

First, let me introduce you to my problem.

My code looks like this:

#include <iostream> #include <thread> #include <condition_variable> std::mutex mtx; std::mutex cvMtx; std::mutex mtx2; bool ready{false}; std::condition_variable cv; int threadsFinishedCurrentLevel{0}; void tfunc() { for(int i = 0; i < 5; i++) { //do something for (int j = 0; j < 10000; j++) { std::cout << j << std::endl; } //this is i-th level mtx2.lock(); threadsFinishedCurrentLevel++; if (threadsFinishedCurrentLevel == 2) { //this is last thread in current level threadsFinishedCurrentLevel = 0; cvMtx.unlock(); } mtx2.unlock(); { //wait for notify unique_lock<mutex> lck(mtx); while (!ready) cv_.wait(lck); } } } int main() { cvMtx.lock(); //init std::thread t1(tfunc); std::thread t2(tfunc); for (int i = 0; i < 5; i++) { cvMtx.lock(); { unique_lock<mutex> lck(mtx); ready = true; cv.notify_all(); } } t1.join(); t2.join(); return 0; } 

I have 2 threads. My computation consists of levels(for this example, lets say we have 5 levels). On the same level, computation can be divided to threads. Each thread then calculates part of a problem. When i want to step to the next(higher) level, lower level must be first done. So my idea is something like this. When last thread on the current level is done, it unlocks main thread, so it can notify all of the threads to continue to next level. But this notify has to be called more then once. Because there are plenty of these levels. Can this condition_variable be restarted or something? Or do I need for each level one condition_variable? So for example, when i have 1000 levels, i need to allocate dynamically 1000x condition_variable?

4
  • Better represent fully worked example (with valid someCondition and so on) for people who want try to reproduce problem. It is possible error appears alredy when you prepare such example/ Commented Feb 10, 2016 at 15:28
  • Normally software development process includes writing tests that cover every part of code. So this problem must be covered by test too. Commented Feb 10, 2016 at 15:33
  • I am sorry, is it better now? Commented Feb 10, 2016 at 15:34
  • Read the en.wikipedia.org/wiki/Test-driven_development read 1-5. Your code is not compiled. Are You try to compile and run before post. What You need to solve problem? You need worked code that show at log some demonstration of error (first). Test code is code that check is error exists or not (second). Need first or second. Commented Feb 10, 2016 at 15:56

2 Answers 2

1

Is it just me or you are trying to block the main thread with a mutex (which is your way of trying to notify it when all threads are done?), I mean that's not the task of a mutex. That's where the condition variable should be used.

// New condition_variable, to nofity main thread when child is done with level std::condition_variable cv2; // When a child is done, it will update this counter int counter = 0; // This is already protected by cvMtx, otherwise it could be atomic. // This is to sync cout std::mutex cout_mutex; void tfunc() { for (int i = 0; i < 5; i++) { { std::lock_guard<std::mutex> l(cout_mutex); std::cout << "Level " << i + 1 << " " << std::this_thread::get_id() << std::endl; } { std::lock_guard<std::mutex> l(cvMtx); counter++; // update counter & } cv2.notify_all(); // notify main thread we are done. { //wait for notify unique_lock<mutex> lck(mtx); cv.wait(lck); // Note that I've removed the "ready" flag here // That's because u would need multiple ready flags to make that work } } } int main() { std::thread t1(tfunc); std::thread t2(tfunc); for (int i = 0; i < 5; i++) { { unique_lock<mutex> lck(cvMtx); // Wait takes a predicate which u can take advantage of cv2.wait(lck, [] { return (counter == 2); }); counter = 0; // This thread will get notified multiple times // But it only will wake up when counter matches 2 // Which equals to how many threads we've created. } // Sleeping a bit to know the code is working std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // Wake up all threds and continue to next level. unique_lock<mutex> lck(mtx); cv.notify_all(); } t1.join(); t2.join(); return 0; } 
Sign up to request clarification or add additional context in comments.

3 Comments

This actually looks great! I think it can work. Right now i'm trying to make it work according to your example. I'll be in touch.
Yep, your solution is fantastic. Thanks a lot :)
I found deadlock. Sometimes tfunc() notifies main thread, which will call cv.notify_all() before tfunc() gets to cv.wait(lck). So it wont be free and wait for notify that will never come. How can this be fixed? And i dont quit understand how can this work. When tfunc() lock mutex mtx with unique_lock and then waits for nofify, how can main thread notifies it, when main thread uses same mutex mtx before is calls cv.notify_all().
0

The synchronization can be done with a single counter, threads increment the counter under lock and check for the counter to reach a multiple of the number of concurrent threads. This greatly simplifies the logic. I've made this change and also grouped the shared variables into a class, and provided member functions to access them. To avoid false sharing I've ensured that variables that are read-only are separate from those that are read-write by the threads, and also separated read-write variables by usage. The use of global variables is discouraged, see C++ Core Guidelines for this and other good advice.

The simplified code follows, you can see it live in ideone. Note: it looks like there isn't true concurrency in ideone, you'll have to run this on a multi-core environment to actually test hardware concurrency.

//http://stackoverflow.com/questions/35318942/stdcondition-variable-calling-notify-all-more-than-once #include <iostream> #include <functional> #include <thread> #include <mutex> #include <vector> #include <condition_variable> static constexpr size_t CACHE_LINE_SIZE = 64; static constexpr size_t NTHREADS = 2; static constexpr size_t NLEVELS = 5; static constexpr size_t NITERATIONS = 100; class Synchronize { alignas(CACHE_LINE_SIZE) // read/write while threads are busy working std::mutex mtx_std_cout; alignas(CACHE_LINE_SIZE) // read/write while threads are synchronizing at level std::mutex cvMtx; std::condition_variable cv; size_t threadsFinished{0}; alignas(CACHE_LINE_SIZE) // read-only parameters const size_t n_threads; const size_t n_levels; public: // class Synchronize owns unique resources: // - must be explicitly constructed // - disallow default ctor, // - disallow copy/move ctor and // - disallow copy/move assignment Synchronize( Synchronize const& ) = delete; Synchronize & operator=( Synchronize const& ) = delete; explicit Synchronize( size_t nthreads, size_t nlevels ) : n_threads{nthreads}, n_levels{nlevels} {} size_t nlevels() const { return n_levels; } std::mutex & std_cout_mutex() { return mtx_std_cout; } void level_done_wait_all( size_t level ) { std::unique_lock<std::mutex> lk(cvMtx); threadsFinished++; cv.wait(lk, [&]{return threadsFinished >= n_threads * (level+1);}); cv.notify_all(); } }; void tfunc( Synchronize & sync ) { for(size_t i = 0; i < sync.nlevels(); i++) { //do something for (size_t j = 0; j < NITERATIONS; j++) { std::unique_lock<std::mutex> lck(sync.std_cout_mutex()); if (j == 0) std::cout << '\n'; std::cout << ' ' << i << ',' << j; } sync.level_done_wait_all(i); } } int main() { Synchronize sync{ NTHREADS, NLEVELS }; std::vector<std::thread*> threads(NTHREADS,nullptr); for(auto&t:threads) t = new std::thread(tfunc,std::ref(sync)); for(auto t:threads) { t->join(); delete t; } std::cout << std::endl; return 0; } 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.