1. tl;dr
- Yes, calling
.notify_one() / .notify_all() on a destructed std::atomic is UB. - This also applies to
std::atomic_ref, because the object it points to must outlive the std::atomic_ref object. - There is a C++ Paper - P2616R4 - which would fix this problem.
But unfortunately it hasn't been updated for a few years, so it's future is unclear. - There are workarounds, depending on your usecase
- using
std::binary_semaphore - delegating
.wait() and .notify_xxx() to a separate atomic variable that has a longer lifetime - manually call the futex syscalls and bypass the stdlib
- ...
2. Long Answers
Q:
Before I get to my main question, I'm assuming that std::mutex::unlock() stops touching this as soon as it puts the mutex in a state where another thread might lock it [...]
A:
That's a valid assumption - the standard even mandates that this type of usage must be supported:
35.5.3.2.1 Class mutex [thread.mutex.class]
(2) [ Note: After a thread A has called unlock(), releasing a mutex, it is possible for another thread B to lock the same mutex, observe that it is no longer in use, unlock it, and destroy it, before thread A appears to have returned from its unlock call. Implementations are required to handle such scenarios correctly, as long as thread A doesn't access the mutex after the unlock call returns. These cases typically occur when a reference-counted object contains a mutex that is used to protect the reference count. — end note ]
Q:
Is it possible to build a mutex from C++20 std::atomic without spinning to avoid use-after-free when deleting?
A:
Yes, it is possible.
But not as straight-forward as one might expect, due to the lifetime issues you already mentioned.
See 3. Solutions for a list of potential ways you could work around this limitiation.
Q:
Is there any hope that a future version of C++ might provide a safe way to notify on destroyed atomics?
A:
There is a paper addressing this specific issue, that could have been part of C++26:
P2616 - Making std::atomic notification/wait operations usable in more situations
Revisions:
The current state of this paper can be seen at cplusplus/papers Issue #1279 on github
(currently the repo is private due to the current committee meeting - archive.org version)
It is stuck with needs-revision since May 23, 2023 - so it's unclear if (and when) it'll ever become part of the standard.
Q:
So, my next question is, can I do anything about this?
The best I've come up with is to spin during destruction, but it's offensive to have to spin when we have futexes that are specifically designed to avoid spinning.
A:
There's unfortunately no one-size-fits-all solution for this.
- Spinning in the destructor is definitely an option, even though it's not pretty.
std::binary_semaphore might be an option - its acquire() and release() member functions do the wait / notify atomically, so there should be no lifetime problems.
(see for example 3.2. Using std::counting_semaphore / std::binary_semaphore) - Another option would be to delegate the wait / notify calls to another atomic variable that has a longer lifetime
(see for example 3.4 Use a waiter pool) - see 3. Potential Solutions for all options
Q:
Am I missing some other trick I could use to do this without spinning - maybe with std::atomic_ref?
A:
std::atomic_ref doesn't help in this case either unfortunately.
One of its rules is that the object it points to must outlive the atomic_ref object.
(which would not be the case if you destruct the object)
31.7 Class template atomic_ref [atomics.ref.generic]
(3) The lifetime ([basic.life]) of an object referenced by *ptr shall exceed the lifetime of all atomic_refs that reference the object. While any atomic_ref instances exist that reference the *ptr object, all accesses to that object shall exclusively occur through those atomic_ref instances. [...]
Q:
Is there any benefit to the language making this UB, or is this basically a flaw in the language spec that atomics don't expose the full expressiveness of the underlying futexes on which they are implemented?
A:
That is unfortunately hard to answer given that there is no rationale for the API that's in the standard.
My guess would be that this edge-case was simply missed.
3. Potential Solutions
This is a (small, incomplete) list of potential solutions / workarounds.
Depending on your specific usecase those might be helpful.
3.1. Spin in the destructor
Like in OP's example.
Not pretty, but it gets the job done.
Might even be an acceptable option assuming it's rather rare that two threads release the mutex in quick succession and the last release happens to free the object containing the mutex.
e.g.: godbolt
class my_mutex { private: using state_t = std::uint32_t; static constexpr state_t StateUnlocked = 0; static constexpr state_t StateLocked = 1; static constexpr state_t StateLockedWithWaiters = 2; static_assert(std::atomic<state_t>::is_always_lock_free); std::atomic<state_t> state = StateUnlocked; std::atomic<std::uint32_t> release_count = 0; public: void lock() { state_t expected = StateUnlocked; if (state.compare_exchange_strong( expected, StateLocked, std::memory_order::acquire, std::memory_order::relaxed )) [[likely]] { return; } while (true) { if (expected != StateLockedWithWaiters) { expected = state.exchange( StateLockedWithWaiters, std::memory_order::acquire ); if (expected == StateUnlocked) { return; } } state.wait(StateLockedWithWaiters); expected = state.load(std::memory_order_relaxed); } } bool try_lock() { state_t expected = StateUnlocked; return state.compare_exchange_strong( expected, StateLocked, std::memory_order::acquire, std::memory_order::relaxed ); } void unlock() { release_count.fetch_add(1, std::memory_order::relaxed); state_t prev = state.exchange( StateUnlocked, std::memory_order::release ); if (prev == StateLockedWithWaiters) [[unlikely]] { state.notify_one(); } release_count.fetch_sub(1, std::memory_order::release); } ~my_mutex() { while(release_count.load(std::memory_order::relaxed)) { // TODO: optional arch specific relax instruction __builtin_ia32_pause(); } } };
Upsides:
Downsides:
- Requires spin-loop in destructor
3.2. Using std::counting_semaphore / std::binary_semaphore
It is relatively straightforward to wrap std::binary_semaphore into a custom mutex implementation that supports unlocking the mutex from a different thread than the one that locked it.
e.g.: godbolt
class my_mutex { private: std::binary_semaphore sem; public: my_mutex() : sem(1) {} void lock() { sem.acquire(); } void unlock() { sem.release(); } bool try_lock() { return sem.try_acquire(); } template<class Rep, class Period> bool try_lock_for(std::chrono::duration<Rep, Period> const& timeout) { return sem.try_acquire_for(timeout); } template<class Clock, class Duration> bool try_lock_until(std::chrono::time_point<Clock, Duration> const& timeout) { return sem.try_acquire_until(timeout); } };
Upsides:
- Works like a normal mutex, but ownership can be transferred between threads.
Downsides:
3.3. Use std::atomic_ref and use the futex wait / wake syscalls
Another option would be to use std::atomic_ref for the atomic read & write operations, and manually handle the waking / sleeping part (by calling the syscalls directly).
e.g.: godbolt
class my_mutex { private: using state_t = std::uint32_t; static constexpr state_t StateUnlocked = 0; static constexpr state_t StateLocked = 1; static constexpr state_t StateLockedWithWaiters = 2; static_assert(std::atomic_ref<state_t>::is_always_lock_free); state_t state = StateUnlocked; void wait() { // TODO use WaitOnAddress for windows, ... other oses ... syscall( SYS_futex, &state, FUTEX_WAIT_PRIVATE, StateLockedWithWaiters, NULL ); } void wake() { // TODO use WakeOnAddress for windows, ... other oses ... syscall( SYS_futex, &state, FUTEX_WAKE_PRIVATE, 1 ); } public: void lock() { state_t expected = StateUnlocked; if(std::atomic_ref(state).compare_exchange_strong( expected, StateLocked, std::memory_order::acquire, std::memory_order::relaxed )) [[likely]] { return; } while(true) { if(expected != StateLockedWithWaiters) { expected = std::atomic_ref(state).exchange( StateLockedWithWaiters, std::memory_order::acquire ); if(expected == StateUnlocked) { return; } } // TODO: maybe spin a little before waiting wait(); expected = std::atomic_ref(state).load( std::memory_order::relaxed ); } } bool try_lock() { state_t expected = StateUnlocked; return std::atomic_ref(state).compare_exchange_strong( expected, StateLocked, std::memory_order::acquire, std::memory_order::relaxed ); } void unlock() { state_t prev = std::atomic_ref(state).exchange( StateUnlocked, std::memory_order::release ); if(prev == StateLockedWithWaiters) [[unlikely]] { wake(); } } };
Upsides:
- Works like a normal mutex, but ownership can be transferred between threads.
- Gives you full control over the size and layout of your mutex class.
Downsides:
- You need to handle
wait() and wake() manually, by directly calling the syscalls.
(to avoid UB with std::atomic_ref::wait when the object gets destroyed)
=> Needs to be implemented for each OS, no longer portable
3.4 Use a waiter pool
Another option would be to avoid waiting on the atomic variable directly by using another atomic variable (with a longer lifetime) solely for the wait / notify.
This is also how most standard libraries implement std::atomic::wait() for types that are not futex-sized.
(libstdc++ for example has a pool of 16 futexes it uses for waits on atomic variables that are non-futex sized. (__wait_flags::__proxy_wait is the flag used to handle wether the wait will be on the atomic value itself or one of the 16 proxy futexes))
e.g.: godbolt
class waiter { private: alignas(std::hardware_destructive_interference_size) std::atomic<std::uint32_t> counter = 0; public: void notify_all() { counter.fetch_add(1, std::memory_order::release); counter.notify_all(); } template <class T> void wait( std::atomic<T> const& var, std::type_identity_t<T> const& oldval ) { while (true) { auto counterval = counter.load(std::memory_order::acquire); if (var.load(std::memory_order::relaxed) != oldval) { return; } counter.wait(counterval); } } }; template <std::size_t N = 256> class waiter_pool { private: static_assert(std::has_single_bit(N), "N should be a power of 2"); waiter waiters[N]; waiter& waiter_for_address(const void *ptr) { std::uintptr_t addr = reinterpret_cast<std::uintptr_t>(ptr); addr = std::hash<std::uintptr_t>{}(addr); return waiters[addr % N]; } public: template <class T> void notify_all(std::atomic<T> const& var) { waiter& w = waiter_for_address(static_cast<const void*>(&var)); w.notify_all(); } template <class T> void wait( std::atomic<T> const& var, std::type_identity_t<T> const& oldval ) { waiter& w = waiter_for_address(static_cast<const void*>(&var)); w.wait(var, oldval); } }; waiter_pool pool; class my_mutex { private: using state_t = std::uint8_t; static constexpr state_t StateUnlocked = 0; static constexpr state_t StateLocked = 1; static constexpr state_t StateLockedWithWaiters = 2; static_assert(std::atomic<state_t>::is_always_lock_free); std::atomic<state_t> state = StateUnlocked; public: void lock() { state_t expected = StateUnlocked; if (state.compare_exchange_strong( expected, StateLocked, std::memory_order::acquire, std::memory_order::relaxed )) [[likely]] { return; } while (true) { if (expected != StateLockedWithWaiters) { expected = state.exchange( StateLockedWithWaiters, std::memory_order::acquire ); if (expected == StateUnlocked) { return; } } // TODO: maybe spin a little before waiting pool.wait(state, StateLockedWithWaiters); expected = state.load(std::memory_order_relaxed); } } bool try_lock() { state_t expected = StateUnlocked; return state.compare_exchange_strong( expected, StateLocked, std::memory_order::acquire, std::memory_order::relaxed ); } void unlock() { state_t prev = state.exchange( StateUnlocked, std::memory_order::release ); if (prev == StateLockedWithWaiters) [[unlikely]] { pool.notify_all(state); } } };
Upsides:
- Works like a normal mutex, but ownership can be transferred between threads.
my_mutex::state does not need to be 32 bits because the futex waiting is delegated to the pool. So instances of my_mutex can be much smaller.
Downsides:
- Needs an extra global pool of atomic variables
- If you have a lot of threads and / or a lot of contention on your mutexes then performance might be suboptimal because the pool always needs to wake all waiters.
3.5 (when using C++20 coroutines) async_mutex
If you're already using C++20 coroutines then using something like async_mutex from cppcoro might be an option.
Coroutines also would have the benefit of not blocking the thread while they're waiting for the mutex - instead the coroutine simply suspends until the mutex is available again.
See async_mutex.hpp / async_mutex.cpp for the implementation.
Upsides:
- Works directly with coroutines, so usage within a coro is straightforward.
Downsides:
- Only usable from coroutines
3.5.1 Mixed coroutine & native mutex
It's also possible to create a mutex than can be used by both coroutines and native threads. (by adapting the async_mutex design to also have a blocking lock method)
e.g.: godbolt
class my_mutex { private: struct wait_list_entry { wait_list_entry* next = nullptr; virtual void wake() = 0; }; struct native_waiter final : wait_list_entry { native_waiter() : sema(0) {} void wait() { sema.acquire(); } void wake() override final { sema.release(); } std::binary_semaphore sema; }; struct awaitable_default_resumer { void operator()(std::coroutine_handle<void> h) { h.resume(); } }; template<class Resumer> class awaitable_base : protected wait_list_entry { protected: my_mutex& mtx; Resumer resumer; std::coroutine_handle<void> coro; public: template<class R> awaitable_base(my_mutex& m, R&& r) : mtx(m), resumer(std::forward<R>(r)), coro(nullptr) {} bool await_ready() { return mtx.try_lock(); } bool await_suspend(std::coroutine_handle<void> h) { coro = h; if(mtx.lock_or_register_waiter(this)) { coro = nullptr; return false; } return true; } protected: void wake() override final { resumer(std::exchange(coro, nullptr)); } }; template<class Resumer> class awaitable_raw final : public awaitable_base<Resumer> { public: using awaitable_base<Resumer>::awaitable_base; void await_resume() const noexcept {} template<class NewResumer> [[nodiscard]] awaitable_raw<std::remove_cvref_t<NewResumer>> resume_with(NewResumer&& nr) { return {this->mtx, std::forward<NewResumer>(nr)}; } }; template<class Resumer> class awaitable_unique final : public awaitable_base<Resumer> { public: using awaitable_base<Resumer>::awaitable_base; [[nodiscard]] std::unique_lock<my_mutex> await_resume() { return {this->mtx, std::adopt_lock}; } template<class NewResumer> [[nodiscard]] awaitable_unique<std::remove_cvref_t<NewResumer>> resume_with(NewResumer&& nr) { return {this->mtx, std::forward<NewResumer>(nr)}; } }; private: static constexpr std::uintptr_t state_unlocked = 0; static constexpr std::uintptr_t state_locked_no_waiters = 1; // contains one of three values: // state_unlocked == mutex is currently unlocked // state_locked_no_waiters == mutex is locked, no pending waiters // pointer to waiter_list_entry == head of singly linked list of new waiters in LIFO order std::atomic<std::uintptr_t> state = state_unlocked; // waiter queue, will be populated by unlock() wait_list_entry* waiter_queue = nullptr; public: my_mutex() = default; my_mutex(my_mutex const&) = delete; my_mutex(my_mutex&&) = delete; my_mutex& operator=(my_mutex const&) = delete; my_mutex& operator=(my_mutex&&) = delete; bool try_lock() { std::uintptr_t prev_state = state_unlocked; return state.compare_exchange_strong( prev_state, state_locked_no_waiters, std::memory_order::acquire, std::memory_order::relaxed ); } void lock() { if(try_lock()) [[likely]] { return; } native_waiter waiter; if(lock_or_register_waiter(&waiter)) { return; } // enqueued waiter, wait for our turn waiter.wait(); } void unlock() { if(waiter_queue == nullptr) [[likely]] { std::uintptr_t prev_state = state_locked_no_waiters; if(state.compare_exchange_strong( prev_state, state_unlocked, std::memory_order::release, std::memory_order::relaxed )) [[likely]] { return; } move_waiters_to_waiter_queue(); } wait_list_entry* entry = waiter_queue; waiter_queue = entry->next; entry->wake(); } [[nodiscard]] awaitable_raw<awaitable_default_resumer> operator co_await() { return lock_async(); } [[nodiscard]] awaitable_raw<awaitable_default_resumer> lock_async() { return awaitable_raw<awaitable_default_resumer>{*this, awaitable_default_resumer{}}; } [[nodiscard]] awaitable_unique<awaitable_default_resumer> lock_async_unique() { return awaitable_unique<awaitable_default_resumer>{*this, awaitable_default_resumer{}}; } private: // Either locks the mutex synchronously (returns true) // or adds the waiter to the wait list (returns false) // The waiter will not be called if the mutex was locked synchronously. bool lock_or_register_waiter(wait_list_entry* waiter) { std::uintptr_t prev_state = state.load(std::memory_order::relaxed); while(true) { if(prev_state == state_unlocked) { if(state.compare_exchange_weak( prev_state, state_locked_no_waiters, std::memory_order::acquire, std::memory_order::relaxed )) { return true; } } else { waiter->next = nullptr; if(prev_state != state_locked_no_waiters) { waiter->next = reinterpret_cast<wait_list_entry*>(prev_state); } std::uintptr_t next_state = reinterpret_cast<std::uintptr_t>( waiter ); if(state.compare_exchange_weak( prev_state, next_state, std::memory_order::release, std::memory_order::relaxed )) { return false; } } } } void move_waiters_to_waiter_queue() { std::uintptr_t prev_state = state.exchange( state_locked_no_waiters, std::memory_order::acquire ); // reverse list of waiters so that they are served in FIFO order // if the mutex does not need to be fair then this loop could be removed wait_list_entry* current_entry = reinterpret_cast<wait_list_entry*>(prev_state); wait_list_entry* prev_entry = nullptr; while(current_entry != nullptr) { wait_list_entry* next = current_entry->next; current_entry->next = prev_entry; prev_entry = current_entry; current_entry = next; } waiter_queue = prev_entry; } };
That's kind of the best of both worlds. Normal threads can still do mutex.lock() and mutex.unlock() (or use std::lock_guard / std::unique_lock / etc...), and coroutines can use co_await mutex.lock_async() and mutex.unlock() (or auto lock = co_await mutex.lock_async_unique() for a std::unique_lock)
Upsides:
- Works with coroutines and native threads
- Is a fair mutex
Downsides:
- Care must be taken that coroutines resume on the correct threads
(Because coroutines waiting on the mutex can resume on any other thread that called a lock method on it)
(.resume_with() can be used to control how coroutines are resumed) - A bit more overhead because we need to maintain a linked list of waiters
3.6 Ensure the atomic does not get destructed during the unlock() sequence
Another option would be to give your mutex the option to increment the refcount of its containing object (or give it a shared_ptr to it if it's managed by one, etc...)
That way your unlock() could look like this:
class my_mutex { some_type* outer; /* ... */ void unlock() { if(outer) outer->incref(); if (state_.exchange(kUnlocked, std::memory_order_release) == kLockedWantNotify) state_.notify_all(); if(outer) outer->decref(); } /* ... */ };
It's not pretty, but at least it gets rid of the spinlock in the destructor.
The extreme option of this would be to dynamically allocate your mutex state - that way you can keep it alive during the unlock() sequence.
e.g.: godbolt
class my_mutex { private: using state_t = std::uint32_t; static constexpr state_t StateUnlocked = 0; static constexpr state_t StateLocked = 1; static constexpr state_t StateLockedWithWaiters = 2; std::shared_ptr<std::atomic<state_t>> state; public: my_mutex() : state(std::make_shared<std::atomic<state_t>>(StateUnlocked)) { } void lock() { state_t expected = StateUnlocked; if (state->compare_exchange_strong( expected, StateLocked, std::memory_order::acquire, std::memory_order::relaxed )) [[likely]] { return; } while (true) { if (expected != StateLockedWithWaiters) { expected = state->exchange( StateLockedWithWaiters, std::memory_order::acquire ); if (expected == StateUnlocked) { return; } } state->wait(StateLockedWithWaiters); expected = state->load(std::memory_order_relaxed); } } bool try_lock() { state_t expected = StateUnlocked; return state->compare_exchange_strong( expected, StateLocked, std::memory_order::acquire, std::memory_order::relaxed ); } void unlock() { std::shared_ptr ref = state; state_t prev = ref->exchange( StateUnlocked, std::memory_order::release ); if (prev == StateLockedWithWaiters) [[unlikely]] { ref->notify_one(); } } };
Upsides:
- Gets rid of the spinlock in the destructor
Downsides:
- Mutex either requires a reference to its containing object (to bump the reference count) or it needs to heap-allocate its state.
std::mutexlocks cannot be migrated to other threads. Thelock()andunlock()calls must happen in the same thread.... to avoid use-after-free when deleting?Problem is not with a mutex or its implementation, problem is program logic. Destructor should not be called when other method is processing or may be processed in future. Invoking destructor means you are done using object. This is so important rule that synchronization inside a destructor do not make sense.std::binary_semaphoreis most likely what you're looking for. (std::binary_semaphore::acquire==BadMutex::lock,std::binary_semaphore::release==BadMutex::unlock) - "Unlike std::mutex a counting_semaphore is not tied to threads of execution - acquiring a semaphore can occur on a different thread than releasing the semaphore, for example. [...]"