0

Problem is Producer thread doesn't wait, I expect Producer thread to wait for Consumer thread until Consumer finishes processing 4000 items, once it finishes, it will notify Producer thread and then Producer thread can proceed with next batch.

What I am doing wrong here?

Another problem, there are not always 4000 items in a batch. esp. if you take last batch. In that case, Produce thread should notify Consumer thread to finish processing of whatever remaining items in <deque>

#include <windows.h> #include <string> #include <iostream> #include <deque> #include <chrono> #include <thread> #include <boost/scoped_ptr.hpp> #include <boost/thread/thread.hpp> #include <boost/thread/mutex.hpp> using namespace std; using namespace boost; boost::mutex ResponseMutex; boost::condition_variable qFull, qEmpty; class A { private: boost::thread* myProducerThread; boost::thread* myConsumerThread; public: A() { myProducerThread = nullptr; myConsumerThread = nullptr; } void RunThreads(); void RunProducer(); void RunConsumer(); void ProducerWait(); void ConsumerWait(); struct Record { char response[128]; Record(const char* response) { memset(this->response,0,sizeof(this->response)); strcpy(this->response, response); } ~Record() { } Record& operator= (const Record& cmd) { if(this == &cmd) // Same object? { return *this; } memset(this->response,0,sizeof(this->response)); strcpy(this->response, cmd.response); return *this; } }; typedef deque<Record> RecordsQueue; }; A::RecordsQueue Records; void A::RunThreads() { myProducerThread = new boost::thread(boost::bind(&A::RunProducer, this)); HANDLE threadHandle1 = myProducerThread->native_handle(); SetThreadPriority(threadHandle1, THREAD_PRIORITY_NORMAL); myConsumerThread = new boost::thread(boost::bind(&A::RunConsumer, this)); HANDLE threadHandle2 = myConsumerThread->native_handle(); SetThreadPriority(threadHandle2, THREAD_PRIORITY_NORMAL); myProducerThread->join(); myConsumerThread->join(); } void A::ProducerWait() { boost::mutex::scoped_lock lock(ResponseMutex); while(!Records.empty()) { qEmpty.wait(lock); } } void A::ConsumerWait() { boost::mutex::scoped_lock lock(ResponseMutex); while(Records.size() <= 4000) { qFull.wait(lock); } } void A::RunProducer() { int i = 0; while(true) { ProducerWait(); vector<string> responses; responses.push_back(to_string(i)); cout<< "Added: " << to_string(i) << endl; i++; qFull.notify_one(); } } void A::RunConsumer() { while(true) { ConsumerWait(); Record res = Records.front(); cout<< "Processed: " << res.response << endl; Records.pop_front(); qEmpty.notify_one(); } } int main() { A a; a.RunThreads(); } 
4
  • You don't need to use bind to create a thread, and why are you dynamically allocating your threads? If you don't create them with new you won't forget to delete them. Commented Dec 9, 2014 at 11:25
  • 1
    Your code is not thread-safe, you only lock the mutex for the scope of ConsumerWait() then access the Records member without a lock. Commented Dec 9, 2014 at 11:30
  • Good call. I didn't mention it, though my sample code fixed that too :) Commented Dec 9, 2014 at 11:30
  • Not to mention strcpy. I'm not sure I want this OP touching threading code / except for learning (maybe)? Commented Dec 9, 2014 at 11:32

1 Answer 1

2
while (true) { std::vector<std::string> responses; responses.push_back(std::to_string(i)); qFull.notify_one(); } 

You don't share the responses. How will the consumer ever see the responses vector? It will be empty at the start of each loop iteration.

Also

  • don't gratuitously use platform dependent code
  • don't gratuitously use dynamic allocation (you're leaking resources)
  • don't mix global variables with class local responsibilities

Next up: your conditions are such that after posting 1 workitems (Record) you will wait until the consumer has consumed 4000. I don't see how that works.

Why don't you just accumulate 4000 items on the producer side and then hand them to the consumer? That way, you could actually start benefitting from threading. Think about this: you had two threads: one is waiting for 4000 jobs to be created, the other is waiting for the first to completely empty the queue. What you have here is glorified sequential code with a lot of noise and unnecessary lock contention.

Bonus

Using my crystal ball here's a version that fixes most of the above (including lack of locking, accounting for spurious wake-up etc.).

You still need to fix the unsafe use of strcpy !!!

#include <string> #include <iostream> #include <deque> #include <chrono> #include <thread> #include <boost/scoped_ptr.hpp> #include <boost/thread/thread.hpp> #include <boost/thread/mutex.hpp> static constexpr auto MIN_QUEUE_WORK = 10; static constexpr auto MAX_QUEUE_WORK = 40; // 4000; class A { private: boost::thread myProducerThread; boost::thread myConsumerThread; boost::mutex mutex_; boost::condition_variable pushed_, popped_; struct Record { char response[128]; Record(const char *response) { memset(this->response, 0, sizeof(this->response)); strcpy(this->response, response); } ~Record() {} Record &operator=(const Record &cmd) { if (this == &cmd) // Same object? { return *this; } memset(this->response, 0, sizeof(this->response)); strcpy(this->response, cmd.response); return *this; } }; typedef std::deque<Record> RecordsQueue; RecordsQueue queue_; public: void RunThreads(); void RunProducer(); void RunConsumer(); }; void A::RunThreads() { myProducerThread = boost::thread(&A::RunProducer, this); myConsumerThread = boost::thread(&A::RunConsumer, this); myProducerThread.join(); myConsumerThread.join(); } void A::RunProducer() { int i = 0; while (i<1000) { boost::mutex::scoped_lock lock(mutex_); popped_.wait(lock, [this] { return queue_.size()<MAX_QUEUE_WORK; }); queue_.push_back(Record { std::to_string(i).c_str() }); std::cout << "Added: " << std::to_string(i) << " size: " << queue_.size() << std::endl; i++; pushed_.notify_one(); } } void A::RunConsumer() { while (true) { boost::mutex::scoped_lock lock(mutex_); pushed_.wait(lock, [this]{return queue_.size()>MIN_QUEUE_WORK;}); Record res = queue_.front(); std::cout << "Processed: " << res.response << std::endl; queue_.pop_front(); popped_.notify_one(); } } int main() { A a; a.RunThreads(); } 
Sign up to request clarification or add additional context in comments.

3 Comments

My bad, I actually modified an existing sample. Let me correct the source. I will edit the post.
@user3924882 Oh. Right :( You know that being a bit more careful makes it more likely you get help in the future, I hope? Here was my idea at what you seem to be looking for Live On Coliru. Have a nice day
Great! I always like your answers, your posts. Great learning experience. Thanks.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.