3

I have to program a multiple producer-consumer system in C++, but I'm lost trying to put together each part of the model (threads with its correct buffer). The basic functioning of the model is: I have an initial thread that executes a function. This returned results need to be put in an undetermined number of buffers, because each elements that the function proccess is different and it needs to be treated in a single thread. Then, with the data stored in the buffers, another n threads need to get the data of this buffers to do another function, and the return of this need to be put in some buffers again.

At the moment I have got this buffer structure created:

template <typename T> class buffer { public: atomic_buffer(int n); int bufSize() const noexcept; bool bufEmpty() const noexcept; bool full() const noexcept; ~atomic_buffer() = default; void put(const T & x, bool last) noexcept; std::pair<bool,T> get() noexcept; private: int next_pos(int p) const noexcept; private: struct item { bool last; T value; }; const int size_; std::unique_ptr<item[]> buf_; alignas(64) std::atomic<int> nextRd_ {0}; alignas(64) std::atomic<int> nextWrt_ {0}; }; 

I've also created a vectorstructure which stores a collection un buffers, in order to satisfy the undetermined number of threads necessity.

std::vector<std::unique_ptr<locked_buffer<std::pair<int, std::vector<std::vector<unsigned char>>>>>> v1; for(int i=0; i<n; i++){ v1.push_back(std::unique_ptr<locked_buffer<std::pair<int,std::vector<std::vector<unsigned char>>>>> (new locked_buffer<std::pair<int, std::vector<std::vector<unsigned char>>>>(aux))); } 

Edit:

Flowchart of the model

2
  • I don't understand your problem. Can you add a drawing of your data flow? Commented Dec 14, 2016 at 18:41
  • Without knowing more context, this looks like there is one producer thread that creates tasks. These tasks are executed independently in parallel. All the results are consumed by one thread. Looks to me like you don't even need any synchronization on the buffers. Commented Dec 14, 2016 at 19:03

1 Answer 1

2

Without knowing more context, this looks like an application for a standard thread pool. You have different tasks that are enqueued to a synchronized queue (like the buffer class you have there). Each worker thread of the thread pool polls this queue and processes one task each time (by executing a run() method for example). They write the results back into another synchronized queue.

Each worker thread has an own thread-local pair of input and output buffers. They don't need synchronization because they are only accessed from within the owner thread itself.

enter image description here

Edit: Actually, I think this can be simplified a lot: Just use a thread pool and one synchronized queue. The worker threads can enqueue new tasks directly into the queue. Each of your threads in the drawing would correspond to one type of task and implement a common Task interface. You don't need mutiple buffers. You can use polymorphism and put everything in one buffer.

Edit 2 - Explanation of thread pools:
A thread pool is just a concept. Forget about the pooling aspect, use a fixed number of threads. The main idea is: Instead of having several threads with a specific function, have N threads that can process any kind of task. Where N is the number of cores of the CPU.

You can transform this

enter image description here

into

enter image description here

The worker thread does something like the following. Note that this is simplified, but you should get the idea.

void Thread::run(buffer<Task*>& queue) { while(true) { Task* task = queue.get(); if(task) task->execute(); while(queue.isEmpty()) waitUntilQueueHasElement(); } } 

And your tasks implement a common interface so you can put Task* pointers into a single queue:

struct Task { virtual void execute() = 0; } struct Task1 : public Task { virtual void execute() override { A(); B1(); C(); } } ... 

Also, do yourself a favour and use typedefs ;)

`std::vector<std::unique_ptr<locked_buffer<std::pair<int, std::vector<std::vector<unsigned char>>>>>> v1;` 

becomes

typedef std::vector<std::vector<unsigned char>> vector2D_uchar; typedef std::pair<int, vector2D_uchar> int_vec_pair; typedef std::unique_ptr<locked_buffer<int_vec_pair>> locked_buffer_ptr; std::vector<locked_buffer_ptr> v1; 
Sign up to request clarification or add additional context in comments.

2 Comments

Could you please give me an example of a thread pool declaration? Thanks for your clear explanation, but the concept of thread pooling is still a bit messy in my head.
@giorgioW Edited. However, I don't know what your ultimate goal is so take this with a grain of salt.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.