6

Before I get to my main question, I'm assuming that std::mutex::unlock() stops touching this as soon as it puts the mutex in a state where another thread might lock it - even though the call to unlock() might not have returned yet. In other words, the following C++ code is correct:

#include <atomic> #include <cstddef> #include <mutex> struct RefCount { std::mutex m_; std::size_t c_{}; // Always construct on heap static RefCount *make() { return new RefCount; } virtual ~RefCount() = default; void incref() { m_.lock(); ++c_; m_.unlock(); } void decref() { m_.lock(); auto c = --c_; m_.unlock(); if (!c) delete this; } protected: RefCount() noexcept = default; }; 

In particular, there should be no undefined behavior if two threads hold the reference count and call decref() - the first thread functionally unlocks the mutex even though unlock() hasn't returned yet, then the second thread unlocks the mutex and destroys the object including the mutex, and only then does the first thread return from the call to unlock().

Unfortunately, examples like the above would seem to preclude straight-forward futex-based mutexes implemented with std::atomic. The usual pattern would be to have 3 states (unlocked, locked, and locked-with-waiters), which might look like the following:

struct BadMutex { static constexpr uint8_t kUnlocked = 0; static constexpr uint8_t kLocked = 1; static constexpr uint8_t kLockedWantNotify = 2; std::atomic_uint8_t state_{kUnlocked}; void lock() { for (;;) { auto s = state_.exchange(kLocked, std::memory_order_acquire); if (s != kUnlocked) s = state_.exchange(kLockedWantNotify, std::memory_order_acquire); if (s == kUnlocked) return; state_.wait(kLockedWantNotify); } } void unlock() { if (state_.exchange(kUnlocked, std::memory_order_release) == kLockedWantNotify) // UB if BadMutex destroyed here state_.notify_all(); } }; 

Unfortunately, if you plug BadMutex into RefCount above, you can get undefined behavior if the second-to-last decref() thread is preempted right before state_.notify_all() and BadMutex is destroyed, because invoking a method on a destroyed object is UB.

Note that using a raw FUTEX_WAKE system call would be fine, because it's no big deal to send a spurious wakeup or even call FUTEX_WAKE on unmapped memory (just get an EFAULT you can ignore). So, in practice, this UB should be no big deal, but of course history doesn't look kindly on intentional UB.

So, my next question is, can I do anything about this? The best I've come up with is to spin during destruction, but it's offensive to have to spin when we have futexes that are specifically designed to avoid spinning. The best I can come up with is something like this:

struct UglyMutex { static constexpr uint8_t kUnlocked = 0; static constexpr uint8_t kLocked = 1; static constexpr uint8_t kLockedWantNotify = 2; std::atomic_uint8_t state_{kUnlocked}; std::atomic_uint16_t unlocking_{0}; ~UglyMutex() { while (unlocking_.load(std::memory_order_acquire)) ; } void lock() { for (;;) { auto s = state_.exchange(kLocked, std::memory_order_acquire); if (s != kUnlocked) s = state_.exchange(kLockedWantNotify, std::memory_order_acquire); if (s == kUnlocked) return; state_.wait(kLockedWantNotify); } } void unlock() { unlocking_.fetch_add(1, std::memory_order_relaxed); if (state_.exchange(kUnlocked, std::memory_order_release) == kLockedWantNotify) state_.notify_all(); unlocking_.fetch_sub(1, std::memory_order_release); } }; 

Am I missing some other trick I could use to do this without spinning - maybe with std::atomic_ref?

Is there any hope that a future version of C++ might provide a safe way to notify on destroyed atomics?

Is there any benefit to the language making this UB, or is this basically a flaw in the language spec that atomics don't expose the full expressiveness of the underlying futexes on which they are implemented?

19
  • 4
    Just so you know: std::mutex locks cannot be migrated to other threads. The lock() and unlock() calls must happen in the same thread. Commented Oct 31 at 7:00
  • 3
    e.g., where objects containing a std::unique_lock are migrated across threads and so need to be unlocked in a different thread from the one locking them... that somehow feels more like a design smell to me. I do a lot of multithreading and I never needed to do that. In my opinion if something is UB in C++ and you need to "work around" it, then reconsider your design choices. (And yes this is vague because from your statement I cannot really guess at your design choices that lead you to this code). Commented Oct 31 at 7:50
  • 3
    ... to avoid use-after-free when deleting? Problem is not with a mutex or its implementation, problem is program logic. Destructor should not be called when other method is processing or may be processed in future. Invoking destructor means you are done using object. This is so important rule that synchronization inside a destructor do not make sense. Commented Oct 31 at 9:29
  • 4
    Paper P2616R2 seems to address this problem. Maybe something for C++29, if we're lucky :) Commented Oct 31 at 14:14
  • 2
    ... though for your use case you don't really need to roll your own solution - std::binary_semaphore is most likely what you're looking for. (std::binary_semaphore::acquire == BadMutex::lock, std::binary_semaphore::release == BadMutex::unlock) - "Unlike std::mutex a counting_semaphore is not tied to threads of execution - acquiring a semaphore can occur on a different thread than releasing the semaphore, for example. [...]" Commented Oct 31 at 14:35

1 Answer 1

5

1. tl;dr

  • Yes, calling .notify_one() / .notify_all() on a destructed std::atomic is UB.
  • This also applies to std::atomic_ref, because the object it points to must outlive the std::atomic_ref object.
  • There is a C++ Paper - P2616R4 - which would fix this problem.
    But unfortunately it hasn't been updated for a few years, so it's future is unclear.
  • There are workarounds, depending on your usecase
    • using std::binary_semaphore
    • delegating .wait() and .notify_xxx() to a separate atomic variable that has a longer lifetime
    • manually call the futex syscalls and bypass the stdlib
    • ...

2. Long Answers

Q:

Before I get to my main question, I'm assuming that std::mutex::unlock() stops touching this as soon as it puts the mutex in a state where another thread might lock it [...]

A:

That's a valid assumption - the standard even mandates that this type of usage must be supported:

35.5.3.2.1 Class mutex [thread.mutex.class]

(2) [ Note: After a thread A has called unlock(), releasing a mutex, it is possible for another thread B to lock the same mutex, observe that it is no longer in use, unlock it, and destroy it, before thread A appears to have returned from its unlock call. Implementations are required to handle such scenarios correctly, as long as thread A doesn't access the mutex after the unlock call returns. These cases typically occur when a reference-counted object contains a mutex that is used to protect the reference count. — end note ]


Q:

Is it possible to build a mutex from C++20 std::atomic without spinning to avoid use-after-free when deleting?

A:

Yes, it is possible.
But not as straight-forward as one might expect, due to the lifetime issues you already mentioned.
See 3. Solutions for a list of potential ways you could work around this limitiation.


Q:

Is there any hope that a future version of C++ might provide a safe way to notify on destroyed atomics?

A:

There is a paper addressing this specific issue, that could have been part of C++26:
P2616 - Making std::atomic notification/wait operations usable in more situations

Revisions:

Paper Date Target C++ Version
P2616R4 2023-02-15 C++26
P2616R3 2022-11-29 C++26
P2616R2 2022-11-16 C++26
P2616R1 2022-11-09 C++26
P2616R0 2022-07-05 C++26

The current state of this paper can be seen at cplusplus/papers Issue #1279 on github
(currently the repo is private due to the current committee meeting - archive.org version)

It is stuck with needs-revision since May 23, 2023 - so it's unclear if (and when) it'll ever become part of the standard.


Q:

So, my next question is, can I do anything about this?
The best I've come up with is to spin during destruction, but it's offensive to have to spin when we have futexes that are specifically designed to avoid spinning.

A:

There's unfortunately no one-size-fits-all solution for this.

  • Spinning in the destructor is definitely an option, even though it's not pretty.
  • std::binary_semaphore might be an option - its acquire() and release() member functions do the wait / notify atomically, so there should be no lifetime problems.
    (see for example 3.2. Using std::counting_semaphore / std::binary_semaphore)
  • Another option would be to delegate the wait / notify calls to another atomic variable that has a longer lifetime
    (see for example 3.4 Use a waiter pool)
  • see 3. Potential Solutions for all options

Q:

Am I missing some other trick I could use to do this without spinning - maybe with std::atomic_ref?

A:

std::atomic_ref doesn't help in this case either unfortunately.
One of its rules is that the object it points to must outlive the atomic_ref object.
(which would not be the case if you destruct the object)

31.7 Class template atomic_ref [atomics.ref.generic]
(3) The lifetime ([basic.life]) of an object referenced by *ptr shall exceed the lifetime of all atomic_­refs that reference the object. While any atomic_­ref instances exist that reference the *ptr object, all accesses to that object shall exclusively occur through those atomic_­ref instances. [...]


Q:

Is there any benefit to the language making this UB, or is this basically a flaw in the language spec that atomics don't expose the full expressiveness of the underlying futexes on which they are implemented?

A:

That is unfortunately hard to answer given that there is no rationale for the API that's in the standard.

My guess would be that this edge-case was simply missed.


3. Potential Solutions

This is a (small, incomplete) list of potential solutions / workarounds.
Depending on your specific usecase those might be helpful.

3.1. Spin in the destructor

Like in OP's example.
Not pretty, but it gets the job done.
Might even be an acceptable option assuming it's rather rare that two threads release the mutex in quick succession and the last release happens to free the object containing the mutex.

e.g.: godbolt

class my_mutex { private: using state_t = std::uint32_t; static constexpr state_t StateUnlocked = 0; static constexpr state_t StateLocked = 1; static constexpr state_t StateLockedWithWaiters = 2; static_assert(std::atomic<state_t>::is_always_lock_free); std::atomic<state_t> state = StateUnlocked; std::atomic<std::uint32_t> release_count = 0; public: void lock() { state_t expected = StateUnlocked; if (state.compare_exchange_strong( expected, StateLocked, std::memory_order::acquire, std::memory_order::relaxed )) [[likely]] { return; } while (true) { if (expected != StateLockedWithWaiters) { expected = state.exchange( StateLockedWithWaiters, std::memory_order::acquire ); if (expected == StateUnlocked) { return; } } state.wait(StateLockedWithWaiters); expected = state.load(std::memory_order_relaxed); } } bool try_lock() { state_t expected = StateUnlocked; return state.compare_exchange_strong( expected, StateLocked, std::memory_order::acquire, std::memory_order::relaxed ); } void unlock() { release_count.fetch_add(1, std::memory_order::relaxed); state_t prev = state.exchange( StateUnlocked, std::memory_order::release ); if (prev == StateLockedWithWaiters) [[unlikely]] { state.notify_one(); } release_count.fetch_sub(1, std::memory_order::release); } ~my_mutex() { while(release_count.load(std::memory_order::relaxed)) { // TODO: optional arch specific relax instruction __builtin_ia32_pause(); } } }; 

Upsides:

  • Easy to implement

Downsides:

  • Requires spin-loop in destructor

3.2. Using std::counting_semaphore / std::binary_semaphore

It is relatively straightforward to wrap std::binary_semaphore into a custom mutex implementation that supports unlocking the mutex from a different thread than the one that locked it.

e.g.: godbolt

class my_mutex { private: std::binary_semaphore sem; public: my_mutex() : sem(1) {} void lock() { sem.acquire(); } void unlock() { sem.release(); } bool try_lock() { return sem.try_acquire(); } template<class Rep, class Period> bool try_lock_for(std::chrono::duration<Rep, Period> const& timeout) { return sem.try_acquire_for(timeout); } template<class Clock, class Duration> bool try_lock_until(std::chrono::time_point<Clock, Duration> const& timeout) { return sem.try_acquire_until(timeout); } }; 

Upsides:

  • Works like a normal mutex, but ownership can be transferred between threads.

Downsides:

3.3. Use std::atomic_ref and use the futex wait / wake syscalls

Another option would be to use std::atomic_ref for the atomic read & write operations, and manually handle the waking / sleeping part (by calling the syscalls directly).

e.g.: godbolt

class my_mutex { private: using state_t = std::uint32_t; static constexpr state_t StateUnlocked = 0; static constexpr state_t StateLocked = 1; static constexpr state_t StateLockedWithWaiters = 2; static_assert(std::atomic_ref<state_t>::is_always_lock_free); state_t state = StateUnlocked; void wait() { // TODO use WaitOnAddress for windows, ... other oses ... syscall( SYS_futex, &state, FUTEX_WAIT_PRIVATE, StateLockedWithWaiters, NULL ); } void wake() { // TODO use WakeOnAddress for windows, ... other oses ... syscall( SYS_futex, &state, FUTEX_WAKE_PRIVATE, 1 ); } public: void lock() { state_t expected = StateUnlocked; if(std::atomic_ref(state).compare_exchange_strong( expected, StateLocked, std::memory_order::acquire, std::memory_order::relaxed )) [[likely]] { return; } while(true) { if(expected != StateLockedWithWaiters) { expected = std::atomic_ref(state).exchange( StateLockedWithWaiters, std::memory_order::acquire ); if(expected == StateUnlocked) { return; } } // TODO: maybe spin a little before waiting wait(); expected = std::atomic_ref(state).load( std::memory_order::relaxed ); } } bool try_lock() { state_t expected = StateUnlocked; return std::atomic_ref(state).compare_exchange_strong( expected, StateLocked, std::memory_order::acquire, std::memory_order::relaxed ); } void unlock() { state_t prev = std::atomic_ref(state).exchange( StateUnlocked, std::memory_order::release ); if(prev == StateLockedWithWaiters) [[unlikely]] { wake(); } } }; 

Upsides:

  • Works like a normal mutex, but ownership can be transferred between threads.
  • Gives you full control over the size and layout of your mutex class.

Downsides:

  • You need to handle wait() and wake() manually, by directly calling the syscalls.
    (to avoid UB with std::atomic_ref::wait when the object gets destroyed)
    => Needs to be implemented for each OS, no longer portable

3.4 Use a waiter pool

Another option would be to avoid waiting on the atomic variable directly by using another atomic variable (with a longer lifetime) solely for the wait / notify.

This is also how most standard libraries implement std::atomic::wait() for types that are not futex-sized.
(libstdc++ for example has a pool of 16 futexes it uses for waits on atomic variables that are non-futex sized. (__wait_flags::__proxy_wait is the flag used to handle wether the wait will be on the atomic value itself or one of the 16 proxy futexes))

e.g.: godbolt

class waiter { private: alignas(std::hardware_destructive_interference_size) std::atomic<std::uint32_t> counter = 0; public: void notify_all() { counter.fetch_add(1, std::memory_order::release); counter.notify_all(); } template <class T> void wait( std::atomic<T> const& var, std::type_identity_t<T> const& oldval ) { while (true) { auto counterval = counter.load(std::memory_order::acquire); if (var.load(std::memory_order::relaxed) != oldval) { return; } counter.wait(counterval); } } }; template <std::size_t N = 256> class waiter_pool { private: static_assert(std::has_single_bit(N), "N should be a power of 2"); waiter waiters[N]; waiter& waiter_for_address(const void *ptr) { std::uintptr_t addr = reinterpret_cast<std::uintptr_t>(ptr); addr = std::hash<std::uintptr_t>{}(addr); return waiters[addr % N]; } public: template <class T> void notify_all(std::atomic<T> const& var) { waiter& w = waiter_for_address(static_cast<const void*>(&var)); w.notify_all(); } template <class T> void wait( std::atomic<T> const& var, std::type_identity_t<T> const& oldval ) { waiter& w = waiter_for_address(static_cast<const void*>(&var)); w.wait(var, oldval); } }; waiter_pool pool; class my_mutex { private: using state_t = std::uint8_t; static constexpr state_t StateUnlocked = 0; static constexpr state_t StateLocked = 1; static constexpr state_t StateLockedWithWaiters = 2; static_assert(std::atomic<state_t>::is_always_lock_free); std::atomic<state_t> state = StateUnlocked; public: void lock() { state_t expected = StateUnlocked; if (state.compare_exchange_strong( expected, StateLocked, std::memory_order::acquire, std::memory_order::relaxed )) [[likely]] { return; } while (true) { if (expected != StateLockedWithWaiters) { expected = state.exchange( StateLockedWithWaiters, std::memory_order::acquire ); if (expected == StateUnlocked) { return; } } // TODO: maybe spin a little before waiting pool.wait(state, StateLockedWithWaiters); expected = state.load(std::memory_order_relaxed); } } bool try_lock() { state_t expected = StateUnlocked; return state.compare_exchange_strong( expected, StateLocked, std::memory_order::acquire, std::memory_order::relaxed ); } void unlock() { state_t prev = state.exchange( StateUnlocked, std::memory_order::release ); if (prev == StateLockedWithWaiters) [[unlikely]] { pool.notify_all(state); } } }; 

Upsides:

  • Works like a normal mutex, but ownership can be transferred between threads.
  • my_mutex::state does not need to be 32 bits because the futex waiting is delegated to the pool. So instances of my_mutex can be much smaller.

Downsides:

  • Needs an extra global pool of atomic variables
  • If you have a lot of threads and / or a lot of contention on your mutexes then performance might be suboptimal because the pool always needs to wake all waiters.

3.5 (when using C++20 coroutines) async_mutex

If you're already using C++20 coroutines then using something like async_mutex from cppcoro might be an option.

Coroutines also would have the benefit of not blocking the thread while they're waiting for the mutex - instead the coroutine simply suspends until the mutex is available again.

See async_mutex.hpp / async_mutex.cpp for the implementation.

Upsides:

  • Works directly with coroutines, so usage within a coro is straightforward.

Downsides:

  • Only usable from coroutines

3.5.1 Mixed coroutine & native mutex

It's also possible to create a mutex than can be used by both coroutines and native threads. (by adapting the async_mutex design to also have a blocking lock method)

e.g.: godbolt

class my_mutex { private: struct wait_list_entry { wait_list_entry* next = nullptr; virtual void wake() = 0; }; struct native_waiter final : wait_list_entry { native_waiter() : sema(0) {} void wait() { sema.acquire(); } void wake() override final { sema.release(); } std::binary_semaphore sema; }; struct awaitable_default_resumer { void operator()(std::coroutine_handle<void> h) { h.resume(); } }; template<class Resumer> class awaitable_base : protected wait_list_entry { protected: my_mutex& mtx; Resumer resumer; std::coroutine_handle<void> coro; public: template<class R> awaitable_base(my_mutex& m, R&& r) : mtx(m), resumer(std::forward<R>(r)), coro(nullptr) {} bool await_ready() { return mtx.try_lock(); } bool await_suspend(std::coroutine_handle<void> h) { coro = h; if(mtx.lock_or_register_waiter(this)) { coro = nullptr; return false; } return true; } protected: void wake() override final { resumer(std::exchange(coro, nullptr)); } }; template<class Resumer> class awaitable_raw final : public awaitable_base<Resumer> { public: using awaitable_base<Resumer>::awaitable_base; void await_resume() const noexcept {} template<class NewResumer> [[nodiscard]] awaitable_raw<std::remove_cvref_t<NewResumer>> resume_with(NewResumer&& nr) { return {this->mtx, std::forward<NewResumer>(nr)}; } }; template<class Resumer> class awaitable_unique final : public awaitable_base<Resumer> { public: using awaitable_base<Resumer>::awaitable_base; [[nodiscard]] std::unique_lock<my_mutex> await_resume() { return {this->mtx, std::adopt_lock}; } template<class NewResumer> [[nodiscard]] awaitable_unique<std::remove_cvref_t<NewResumer>> resume_with(NewResumer&& nr) { return {this->mtx, std::forward<NewResumer>(nr)}; } }; private: static constexpr std::uintptr_t state_unlocked = 0; static constexpr std::uintptr_t state_locked_no_waiters = 1; // contains one of three values: // state_unlocked == mutex is currently unlocked // state_locked_no_waiters == mutex is locked, no pending waiters // pointer to waiter_list_entry == head of singly linked list of new waiters in LIFO order std::atomic<std::uintptr_t> state = state_unlocked; // waiter queue, will be populated by unlock() wait_list_entry* waiter_queue = nullptr; public: my_mutex() = default; my_mutex(my_mutex const&) = delete; my_mutex(my_mutex&&) = delete; my_mutex& operator=(my_mutex const&) = delete; my_mutex& operator=(my_mutex&&) = delete; bool try_lock() { std::uintptr_t prev_state = state_unlocked; return state.compare_exchange_strong( prev_state, state_locked_no_waiters, std::memory_order::acquire, std::memory_order::relaxed ); } void lock() { if(try_lock()) [[likely]] { return; } native_waiter waiter; if(lock_or_register_waiter(&waiter)) { return; } // enqueued waiter, wait for our turn waiter.wait(); } void unlock() { if(waiter_queue == nullptr) [[likely]] { std::uintptr_t prev_state = state_locked_no_waiters; if(state.compare_exchange_strong( prev_state, state_unlocked, std::memory_order::release, std::memory_order::relaxed )) [[likely]] { return; } move_waiters_to_waiter_queue(); } wait_list_entry* entry = waiter_queue; waiter_queue = entry->next; entry->wake(); } [[nodiscard]] awaitable_raw<awaitable_default_resumer> operator co_await() { return lock_async(); } [[nodiscard]] awaitable_raw<awaitable_default_resumer> lock_async() { return awaitable_raw<awaitable_default_resumer>{*this, awaitable_default_resumer{}}; } [[nodiscard]] awaitable_unique<awaitable_default_resumer> lock_async_unique() { return awaitable_unique<awaitable_default_resumer>{*this, awaitable_default_resumer{}}; } private: // Either locks the mutex synchronously (returns true) // or adds the waiter to the wait list (returns false) // The waiter will not be called if the mutex was locked synchronously. bool lock_or_register_waiter(wait_list_entry* waiter) { std::uintptr_t prev_state = state.load(std::memory_order::relaxed); while(true) { if(prev_state == state_unlocked) { if(state.compare_exchange_weak( prev_state, state_locked_no_waiters, std::memory_order::acquire, std::memory_order::relaxed )) { return true; } } else { waiter->next = nullptr; if(prev_state != state_locked_no_waiters) { waiter->next = reinterpret_cast<wait_list_entry*>(prev_state); } std::uintptr_t next_state = reinterpret_cast<std::uintptr_t>( waiter ); if(state.compare_exchange_weak( prev_state, next_state, std::memory_order::release, std::memory_order::relaxed )) { return false; } } } } void move_waiters_to_waiter_queue() { std::uintptr_t prev_state = state.exchange( state_locked_no_waiters, std::memory_order::acquire ); // reverse list of waiters so that they are served in FIFO order // if the mutex does not need to be fair then this loop could be removed wait_list_entry* current_entry = reinterpret_cast<wait_list_entry*>(prev_state); wait_list_entry* prev_entry = nullptr; while(current_entry != nullptr) { wait_list_entry* next = current_entry->next; current_entry->next = prev_entry; prev_entry = current_entry; current_entry = next; } waiter_queue = prev_entry; } }; 

That's kind of the best of both worlds. Normal threads can still do mutex.lock() and mutex.unlock() (or use std::lock_guard / std::unique_lock / etc...), and coroutines can use co_await mutex.lock_async() and mutex.unlock() (or auto lock = co_await mutex.lock_async_unique() for a std::unique_lock)

Upsides:

  • Works with coroutines and native threads
  • Is a fair mutex

Downsides:

  • Care must be taken that coroutines resume on the correct threads
    (Because coroutines waiting on the mutex can resume on any other thread that called a lock method on it)
    (.resume_with() can be used to control how coroutines are resumed)
  • A bit more overhead because we need to maintain a linked list of waiters

3.6 Ensure the atomic does not get destructed during the unlock() sequence

Another option would be to give your mutex the option to increment the refcount of its containing object (or give it a shared_ptr to it if it's managed by one, etc...)

That way your unlock() could look like this:

class my_mutex { some_type* outer; /* ... */ void unlock() { if(outer) outer->incref(); if (state_.exchange(kUnlocked, std::memory_order_release) == kLockedWantNotify) state_.notify_all(); if(outer) outer->decref(); } /* ... */ }; 

It's not pretty, but at least it gets rid of the spinlock in the destructor.

The extreme option of this would be to dynamically allocate your mutex state - that way you can keep it alive during the unlock() sequence.

e.g.: godbolt

class my_mutex { private: using state_t = std::uint32_t; static constexpr state_t StateUnlocked = 0; static constexpr state_t StateLocked = 1; static constexpr state_t StateLockedWithWaiters = 2; std::shared_ptr<std::atomic<state_t>> state; public: my_mutex() : state(std::make_shared<std::atomic<state_t>>(StateUnlocked)) { } void lock() { state_t expected = StateUnlocked; if (state->compare_exchange_strong( expected, StateLocked, std::memory_order::acquire, std::memory_order::relaxed )) [[likely]] { return; } while (true) { if (expected != StateLockedWithWaiters) { expected = state->exchange( StateLockedWithWaiters, std::memory_order::acquire ); if (expected == StateUnlocked) { return; } } state->wait(StateLockedWithWaiters); expected = state->load(std::memory_order_relaxed); } } bool try_lock() { state_t expected = StateUnlocked; return state->compare_exchange_strong( expected, StateLocked, std::memory_order::acquire, std::memory_order::relaxed ); } void unlock() { std::shared_ptr ref = state; state_t prev = ref->exchange( StateUnlocked, std::memory_order::release ); if (prev == StateLockedWithWaiters) [[unlikely]] { ref->notify_one(); } } }; 

Upsides:

  • Gets rid of the spinlock in the destructor

Downsides:

  • Mutex either requires a reference to its containing object (to bump the reference count) or it needs to heap-allocate its state.
Sign up to request clarification or add additional context in comments.

4 Comments

Thanks for the detailed answer. One thing I'll point out is that in 3.4 is that 16 futexes is woefully underprovisioning things on modern machines. You generally want something quadratic in the number of cores. I don't understand why. I do have coroutine-specific mutexes, but they don't work when coroutine code and non-coroutine code need to share a lock. In 3.3, what would happen if you made state_t a std::atomic_uint32_t (maybe with a static_assert that it's the same size as a uint32_t)? Do you need an atomic_ref to make the futex system call work?
for comparison llvm's libc++ uses a table with 256 entries, so it's a bit more generous in terms of futexes. - my guess for the low number would be memory consumption - 16 entries * 64 bytes / cacheline == 1kB of memory, and that for each process that loads libstdc++ (so basically almost all of them) - with libc++ its 16kb memory per process; so it's somewhat of a tradeoff space wasted vs wait efficiency.
For 3.3: Given that std::atomic needs to be standard-layout you can do even better - you can check that that it actually contains an std::uint32_t and no other members using std::is_layout_compatible, e.g.: struct expected_atomic_layout { std::uint32_t val; }; static_assert(std::is_layout_compatible_v<std::atomic<std::uint32_t>, expected_atomic_layout>);. To obtain a pointer to the std::uint32_t contained in the atomic reinterpret_cast can be used (the first member of a standard layout type is pointer-interconvertible with the type itself).
Godbolt Example. It'll probably result in the same (or very similar) code in comparison to std::atomic_ref. It might also be worth pointing out that in my tests 3.3 had the worst performance; but that's most likely because it doesn't spin a few times before the futex syscall (helps a lot if the code between lock() and unlock() is short) and doesn't track contention (helps to minimize expensive futex syscalls).

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.