19

I'm trying to implement a lock free multiple producer, multiple consumer queue in C++11. I'm doing this as a learning exercise, so I'm well aware that I could just use an existing open source implementation, but I'd really like to find out why my code doesn't work. The data is stored in a ringbuffer, apparently it is a "bounded MPMC queue".

I've modelled it pretty closely to what I've read of Disruptor. The thing I've noticed is that it works absolutely fine with a single consumer and single/multiple producers, it's just multiple consumers which seems to break it.

Here's the queue:

 template <typename T> class Queue : public IQueue<T> { public: explicit Queue( int capacity ); ~Queue(); bool try_push( T value ); bool try_pop( T& value ); private: typedef struct { bool readable; T value; } Item; std::atomic<int> m_head; std::atomic<int> m_tail; int m_capacity; Item* m_items; }; template <typename T> Queue<T>::Queue( int capacity ) : m_head( 0 ), m_tail( 0 ), m_capacity(capacity), m_items( new Item[capacity] ) { for( int i = 0; i < capacity; ++i ) { m_items[i].readable = false; } } template <typename T> Queue<T>::~Queue() { delete[] m_items; } template <typename T> bool Queue<T>::try_push( T value ) { while( true ) { // See that there's room int tail = m_tail.load(std::memory_order_acquire); int new_tail = ( tail + 1 ); int head = m_head.load(std::memory_order_acquire); if( ( new_tail - head ) >= m_capacity ) { return false; } if( m_tail.compare_exchange_weak( tail, new_tail, std::memory_order_acq_rel ) ) { // In try_pop, m_head is incremented before the reading of the value has completed, // so though we've acquired this slot, a consumer thread may be in the middle of reading tail %= m_capacity; std::atomic_thread_fence( std::memory_order_acquire ); while( m_items[tail].readable ) { } m_items[tail].value = value; std::atomic_thread_fence( std::memory_order_release ); m_items[tail].readable = true; return true; } } } template <typename T> bool Queue<T>::try_pop( T& value ) { while( true ) { int head = m_head.load(std::memory_order_acquire); int tail = m_tail.load(std::memory_order_acquire); if( head == tail ) { return false; } int new_head = ( head + 1 ); if( m_head.compare_exchange_weak( head, new_head, std::memory_order_acq_rel ) ) { head %= m_capacity; std::atomic_thread_fence( std::memory_order_acquire ); while( !m_items[head].readable ) { } value = m_items[head].value; std::atomic_thread_fence( std::memory_order_release ); m_items[head].readable = false; return true; } } } 

And here's the test I'm using:

void Test( std::string name, Queue<int>& queue ) { const int NUM_PRODUCERS = 64; const int NUM_CONSUMERS = 2; const int NUM_ITERATIONS = 512; bool table[NUM_PRODUCERS*NUM_ITERATIONS]; memset(table, 0, NUM_PRODUCERS*NUM_ITERATIONS*sizeof(bool)); std::vector<std::thread> threads(NUM_PRODUCERS+NUM_CONSUMERS); std::chrono::system_clock::time_point start, end; start = std::chrono::system_clock::now(); std::atomic<int> pop_count (NUM_PRODUCERS * NUM_ITERATIONS); std::atomic<int> push_count (0); for( int thread_id = 0; thread_id < NUM_PRODUCERS; ++thread_id ) { threads[thread_id] = std::thread([&queue,thread_id,&push_count]() { int base = thread_id * NUM_ITERATIONS; for( int i = 0; i < NUM_ITERATIONS; ++i ) { while( !queue.try_push( base + i ) ){}; push_count.fetch_add(1); } }); } for( int thread_id = 0; thread_id < ( NUM_CONSUMERS ); ++thread_id ) { threads[thread_id+NUM_PRODUCERS] = std::thread([&]() { int v; while( pop_count.load() > 0 ) { if( queue.try_pop( v ) ) { if( table[v] ) { std::cout << v << " already set" << std::endl; } table[v] = true; pop_count.fetch_sub(1); } } }); } for( int i = 0; i < ( NUM_PRODUCERS + NUM_CONSUMERS ); ++i ) { threads[i].join(); } end = std::chrono::system_clock::now(); std::chrono::duration<double> duration = end - start; std::cout << name << " " << duration.count() << std::endl; std::atomic_thread_fence( std::memory_order_acq_rel ); bool result = true; for( int i = 0; i < NUM_PRODUCERS * NUM_ITERATIONS; ++i ) { if( !table[i] ) { std::cout << "failed at " << i << std::endl; result = false; } } std::cout << name << " " << ( result? "success" : "fail" ) << std::endl; } 

Any nudging in the right direction would be greatly appreciated. I'm pretty new to memory fences rather than just using a mutex for everything, so I'm probably just fundamentally misunderstanding something.

Cheers J

19
  • 4
    You should add to your description that you're building a bounded MPMC queue. That's a pretty important aspect. Commented Sep 7, 2014 at 11:11
  • I'd never heard that term before, thanks =) Commented Sep 7, 2014 at 11:34
  • I don't like the asymmetry in thread fence acquire/release. You sure that is correct? Commented Sep 7, 2014 at 15:15
  • @LumpN which asymmetry do you mean? Commented Sep 7, 2014 at 17:40
  • 3
    To quote Yakk: "The proper way to approach a lock free data structure is to write a semi formal proof that your design works in pseudo code. You shouldn't be asking "is this lock free code thread safe", but rather "does my proof that this lock free code is thread safe have any errors?"" These things are notoriously difficult to get right. I suggest looking at an existing implementation to get some ideas how difficult it actually is (caveats etc.) and then start proofing. Commented Sep 7, 2014 at 20:38

4 Answers 4

14

I'd give a look to Moody Camel's implementation.

It is a fast general purpose lock-free queue for C++ entirely written in C++11. Documentation seems to be rather good along with a few performance tests.

Among all other interesting things (they're worth a read anyway), it's all contained in a single header, and available under the simplified BSD license. Just drop it in your project and enjoy!

Sign up to request clarification or add additional context in comments.

1 Comment

The second header file is for the blocking version.
3

The simplest approach uses a circular buffer. That is it's like an array of 256 elements and you use uint8_t as index so it wraps around and starts at beginning when you overflow it.

The simplest primitive you can build upon is when you have single producer, single consumer thread.

The buffer has two heads:

  • Write head: It points the element which will be written next.
  • Read head: It points to the element which will be read next.

Operation of the producer:

  1. If write Head + 1 == read head, the buffer is full, return buffer full error.
  2. Write content to the element.
  3. Insert memory barrier to sync CPU cores.
  4. Move the write head forward.

At the buffer full case there is still 1 room left, but we reserve that, to distinguish from the buffer empty case.

Operation of the consumer:

  1. If read head == write head, the buffer is empty, return buffer empty error.
  2. Read content of the element.
  3. Insert memory barrier to sync CPU cores.
  4. Move the read head forward.

The producer owns the write head, the consumer owns the read head, there is no concurrency on those. Also the heads are updated when the operation is completed, this ensure the consumer leaves finished elements behind, and the consumes leaves behind fully consumed empty cells.

Create 2 of these pipes in both directions whenever you fork off a new thread and you can have bidirectional communication with your threads.

Given that we are talking about lock freeness it also means none of the threads are blocked, when there is nothing to do the threads are spinning empty, you may want to detect this and add some sleep when it happens.

1 Comment

Indeed creating a pair of spsc queues between each pair of threads who need to communicate is the easiest, and often more performant than a single mpmc queue when under high contention. There are reasons you'd need multi-consumer at least, such as job stealing. This was my writeup of this whole rabbit hole if you're at all interested: codersblock.org/blog/2016/6/02/ditching-the-mutex
0

How about this lock free queue

It is memory ordering lock free queue, but this need to pre-set number of current thread when init the queue.

For example:-

int* ret; int max_concurrent_thread = 16; lfqueue_t my_queue; lfqueue_init(&my_queue, max_concurrent_thread ); /** Wrap This scope in other threads **/ int_data = (int*) malloc(sizeof(int)); assert(int_data != NULL); *int_data = i++; /*Enqueue*/ while (lfqueue_enq(&my_queue, int_data) == -1) { printf("ENQ Full ?\n"); } /** Wrap This scope in other threads **/ /*Dequeue*/ while ( (int_data = lfqueue_deq(&my_queue)) == NULL) { printf("DEQ EMPTY ..\n"); } // printf("%d\n", *(int*) ret ); free(ret); /** End **/ lfqueue_destroy(&my_queue); 

Comments

0

On another similar question, I presented a solution to this problem. I believe that it the smallest found so far.

I will not put same answer here, but the repository has a fully functional C++ implementation of the lock free queue you desire.

EDIT: Thanks to code review from @PeterCordes, I've found a bug on the solution when using 64 bit templates, but now it's working perfectly.

This is the output I receive when running the tests

Creating 4 producers & 4 consumers to flow 10.000.000 items trough the queue. Produced: 10.743.668.245.000.000 Consumed: 5.554.289.678.184.004 Produced: 10.743.668.245.000.000 Consumed: 15.217.833.969.059.643 Produced: 10.743.668.245.000.000 Consumed: 7.380.542.769.600.801 Produced: 10.743.668.245.000.000 Consumed: 14.822.006.563.155.552 Checksum: 0 (it must be zero) 

15 Comments

Much like my implementation here codersblock.org/blog/2016/6/02/ditching-the-mutex your solution isn't truly lock-free. If you suspended producer threads between acquiring their slot and writing the item, or consumer threads between reading their item and clearing their slot, then it will lock up the data structure.
I think it's technically not safe to have concurrent writers to a std::vector, and even if you switch to a std::array or T[], then if T can't be written atomically then you have a data race there. If T can be written atomically, you could still get a torn read (data race) if the items are not naturally aligned. Other than that it looks ok, I think because you're using seq_cst memory ordering you've saved yourself some headaches with observing the correct side effects. I'd recommend testing it with Relacy though when you think it's safe.
@Joe: Most lockless queues aren't technically lock-free; but in practice perform well. Lock-free Progress Guarantees. You can use atomics to let one thread "claim" a slot in a std::vector. But you're right, this queue doesn't do that, which is a bug. It doesn't distinguish between claimed and finished-writing states, so you could have a writer racing with a reader. There's no atomic operation after buffer[t & mask] = item in github.com/bittnkr/uniq/blob/master/cpp/uniq.h. Testing with sizeof(T)=128 or something should reveal tearing.
@PeterCordes indeed, I agree, most aren't, and true lock-free-ness isn't necessarily better in a real world use-case. Further to the std::vector - I know in reality you're certainly just writing to an array, but I think the standard says you shouldn't have concurrent writers to a std::vector. Anywho, I've personally come a loooong way since I wrote this post, and don't advocate using a multi producer multi consumer queue (though it's a fun exercise). These days I find the best performance in single producer single consumer queues between thread pairs.
@Joe: yup. But unfortunately this queue does appear to have a race on individual elements between writers and readers. (And vice versa, with pop doing buffer[t] = 0; which could step on the data from a push.) And the lack of synchronization would let pop read a claimed but not-yet-written entry, if it goes even earlier than what you pointed out about tearing.
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.