4

I've written a small lightweight push/pop queue based on a vector (figured it should be fast) like this:

template <typename T> class MyVectorQueue{ public: MyVectorQueue (size_t sz):my_queue(sz),start(0),end(0){} void push(const T& val){ size_t idx=atomic_fetch_add(&end,1UL); if (idx==my_queue.size()) throw std::runtime_error("Limit reached, initialize to larger vector"); my_queue[idx]=val; } const T& pop(){ if (start==end) return end__; return my_queue[start.fetch_add(1UL)]; } size_t empty() { return end==start; } const T& end() { return end__; } private: T end__; std::atomic<size_t> start,end; std::vector<T> my_queue; }; 

The size of the vector should be known and I am wondering why is it not thread safe? In what circumstances does this mess my structure?

2
  • 1
    The code you have written is fine. The more interesting part is what you do when the vector is filled up. Commented Jan 15, 2014 at 14:24
  • CODE IS BROKEN and there is no simple way to do it right. As the book said[1], lock free programming should be left to the experts, not me. [1] The c++ Programming Language 4th ed. Commented Jan 20, 2014 at 6:32

3 Answers 3

7

Your start and end are atomic variables, but using std::vector::operator[] is not atomic operation, making it not thread-safe.


Suppose you have 10 threads and the size of the vector is 5. Now, suppose all of them are executing, say push.

Now suppose all 10 threads may have passed the check and the if (end==my_queue.size()) is evaluated to false, as and end has not reached the limit, meaning - the vector is not full.

Then, it's possible that all of them increment end and at the same time call std::vector::operator[]. At least 5 of the threads will try to access elements, "outside" the vector.

Sign up to request clarification or add additional context in comments.

13 Comments

Since the indexes with which operator[] are called are unique per all threads, why should it matter?
Well, they are not that unique. Are these the only operations over the vector? I guess not. Somewhere something reads these values, right? Also, resizing the vector may cause reallocations of the internal memory of the vector, while some thread is reading from it. And many other things. Depends on the usage.
What do you mean not that unique? I'm strictly referring to these two functions and the size of the vector is set in the constructor. Helgrind bugs me in the push function.
@LucianMLI - see my edit (good question, by the way; you have +1 from me).
I see.. so how about if i get size_t idx=atomic_fetch_add(&end,1UL) and check this one to modify the vector?
|
1

You're using operator[] to push items, but this won't grow the vector in order to add the item. Therefore you'll get undefined behaviour (and probably an access violation) when trying to add an item to a index that does not exist.

Also, although you're using atomic operation on start and end the vector isn't atomic. So, for example, you could have multiple threads call push, they atomically alter end and then all call operator[] which isn't thread safe. Instead you should think about using a mutex and a std::deque :

std::mutex mutex; std::deque<T> my_queue; void push(const T& val){ std::lock_guard<std::mutex> guard(mutex); //..code to check if full my_queue.push_back(val); } const T& pop(){ std::lock_guard<std::mutex> guard(mutex); //code to check if empty and that start index does not pass end index T item=my_queue.front(); my_queue.pop_front(); return item; } 

1 Comment

The lock_guard in push and pop should be before the //..code to check if full/empty code.
1

Although this code looks dangerously wrong on many accounts at first glance, it actually only contains a single problem. Otherwise it is perfectly fine, within its limitations.

You are creating a vector initialized to some particular size, and you do not allow more elements being pushed than this given size. That is somewhat "odd behavior" but if this is what is desired, then there is no problem.

Calling vector::operator[] looks very troublesome for thread-safety because it is not an atomic operation, but it is really not a problem as such. All that vector::operator[] does is return a reference to the element corresponding to the index that you provide. It doesn't do any bounds-checking or reallocation or any other complicated stuff that might break in presence of concurrency (in all likelihood, it boils down to something like a single LEA instruction).
You are using fetch_add in every case, which is the right mindset to guarantee that indices are unique among threads. If indices ar unique, no two threads will ever access the same element, insofar it does not matter whether that access is atomic. Even if several threads atomically increment a counter at the same time, they will all get different results (i.e. no increments are "lost" on the way). At least that's the theory, and it's true for as far push is concerned, too.

The one real problem lies in the pop function where (other than in push) you do not atomically fetch_add the index (or more correctly, both indices) into local variables prior to comparing them and to calling operator[].
This is a problem since if(start==end) is not atomic as such, and it is not atomic in conjuction with calling operator[] a few lines later. You must fetch both values to be compared in one atomic operation, or you have no way of asserting that the comparison is meaningful in any way. Otherwise:

  • Another thread (or another two or three threads) can increment either start or end (or both) while you compare them, and your check for having reached the maximum capacity will fail.
  • Another thread may increment start after that check, but before the fetch_add inside the call to operator[] is seen, causing you to index out-of-bounds and invoking undefined behavior.

The non-intuitive thing to note here is that although you do "the right thing" by using atomic operations, the program will still not behave correctly, since not just individual operations need to be atomic.

6 Comments

The OP edited the question "a bit", which was significant change. push was with the same issue as pop. OP edited the implementation during our conversation in the comments below my answer. But I my observations were exactly the same. +1 from me.
@KirilKirov: Ah OK, I had not noticed that there had been an edit, only saw the current version. Will have a look at the previous one too :-)
@KirilKirov: You're right, the original code has a very similar issue in push as well, and problem exactly as you describe in your answer.
Thanks. The edit is very misleading, I asked the OP not to do such edits in the future, because it really changes the whole question and makes some answers wrong/unhelpful/etc.
Even if you fetch_added the indices into local variables correctly, that wouldn't actually resolve the problem. Imagine we have one item in the queue. Assume we want to pop it and correctly get both start and end and increment start while doing so (all atomically), but before we can read the item, another thread starts pushing a new element on the now empty thread. We have one thread reading and another writing to the same memory location without any synchronization -> undefined behavior
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.