Problem is Producer thread doesn't wait, I expect Producer thread to wait for Consumer thread until Consumer finishes processing 4000 items, once it finishes, it will notify Producer thread and then Producer thread can proceed with next batch.
What I am doing wrong here?
Another problem, there are not always 4000 items in a batch. esp. if you take last batch. In that case, Produce thread should notify Consumer thread to finish processing of whatever remaining items in <deque>
#include <windows.h> #include <string> #include <iostream> #include <deque> #include <chrono> #include <thread> #include <boost/scoped_ptr.hpp> #include <boost/thread/thread.hpp> #include <boost/thread/mutex.hpp> using namespace std; using namespace boost; boost::mutex ResponseMutex; boost::condition_variable qFull, qEmpty; class A { private: boost::thread* myProducerThread; boost::thread* myConsumerThread; public: A() { myProducerThread = nullptr; myConsumerThread = nullptr; } void RunThreads(); void RunProducer(); void RunConsumer(); void ProducerWait(); void ConsumerWait(); struct Record { char response[128]; Record(const char* response) { memset(this->response,0,sizeof(this->response)); strcpy(this->response, response); } ~Record() { } Record& operator= (const Record& cmd) { if(this == &cmd) // Same object? { return *this; } memset(this->response,0,sizeof(this->response)); strcpy(this->response, cmd.response); return *this; } }; typedef deque<Record> RecordsQueue; }; A::RecordsQueue Records; void A::RunThreads() { myProducerThread = new boost::thread(boost::bind(&A::RunProducer, this)); HANDLE threadHandle1 = myProducerThread->native_handle(); SetThreadPriority(threadHandle1, THREAD_PRIORITY_NORMAL); myConsumerThread = new boost::thread(boost::bind(&A::RunConsumer, this)); HANDLE threadHandle2 = myConsumerThread->native_handle(); SetThreadPriority(threadHandle2, THREAD_PRIORITY_NORMAL); myProducerThread->join(); myConsumerThread->join(); } void A::ProducerWait() { boost::mutex::scoped_lock lock(ResponseMutex); while(!Records.empty()) { qEmpty.wait(lock); } } void A::ConsumerWait() { boost::mutex::scoped_lock lock(ResponseMutex); while(Records.size() <= 4000) { qFull.wait(lock); } } void A::RunProducer() { int i = 0; while(true) { ProducerWait(); vector<string> responses; responses.push_back(to_string(i)); cout<< "Added: " << to_string(i) << endl; i++; qFull.notify_one(); } } void A::RunConsumer() { while(true) { ConsumerWait(); Record res = Records.front(); cout<< "Processed: " << res.response << endl; Records.pop_front(); qEmpty.notify_one(); } } int main() { A a; a.RunThreads(); }
bindto create a thread, and why are you dynamically allocating your threads? If you don't create them withnewyou won't forget to delete them.ConsumerWait()then access theRecordsmember without a lock.