Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_ASYNC_MUTEX_HPP
11 : #define BOOST_CAPY_ASYNC_MUTEX_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/intrusive.hpp>
15 : #include <boost/capy/concept/executor.hpp>
16 : #include <boost/capy/error.hpp>
17 : #include <boost/capy/ex/io_env.hpp>
18 : #include <boost/capy/io_result.hpp>
19 :
20 : #include <stop_token>
21 :
22 : #include <atomic>
23 : #include <coroutine>
24 : #include <new>
25 : #include <utility>
26 :
27 : /* async_mutex implementation notes
28 : ================================
29 :
30 : Waiters form a doubly-linked intrusive list (fair FIFO). lock_awaiter
31 : inherits intrusive_list<lock_awaiter>::node; the list is owned by
32 : async_mutex::waiters_.
33 :
34 : Cancellation via stop_token
35 : ---------------------------
36 : A std::stop_callback is registered in await_suspend. Two actors can
37 : race to resume the suspended coroutine: unlock() and the stop callback.
38 : An atomic bool `claimed_` resolves the race -- whoever does
39 : claimed_.exchange(true) and reads false wins. The loser does nothing.
40 :
41 : The stop callback calls ex_.dispatch(h_). If dispatch runs inline
42 : (same thread), the stop_callback is destroyed from within its own
43 : operator() via await_resume. This is safe: cancel_fn touches no
44 : members after dispatch returns (same pattern as delete-this).
45 :
46 : unlock() pops waiters from the front. If the popped waiter was
47 : already claimed by the stop callback, unlock() skips it and tries
48 : the next. await_resume removes the (still-linked) canceled waiter
49 : via waiters_.remove(this).
50 :
51 : The stop_callback lives in a union to suppress automatic
52 : construction/destruction. Placement new in await_suspend, explicit
53 : destructor call in await_resume and ~lock_awaiter.
54 :
55 : Member ordering constraint
56 : --------------------------
57 : The union containing stop_cb_ must be declared AFTER the members
58 : the callback accesses (h_, ex_, claimed_, canceled_). If the
59 : stop_cb_ destructor blocks waiting for a concurrent callback, those
60 : members must still be alive (C++ destroys in reverse declaration
61 : order).
62 :
63 : active_ flag
64 : ------------
65 : Tracks both list membership and stop_cb_ lifetime (they are always
66 : set and cleared together). Used by the destructor to clean up if the
67 : coroutine is destroyed while suspended (e.g. execution_context
68 : shutdown).
69 :
70 : Cancellation scope
71 : ------------------
72 : Cancellation only takes effect while the coroutine is suspended in
73 : the wait queue. If the mutex is unlocked, await_ready acquires it
74 : immediately without checking the stop token. This is intentional:
75 : the fast path has no token access and no overhead.
76 :
77 : Threading assumptions
78 : ---------------------
79 : - All list mutations happen on the executor thread (await_suspend,
80 : await_resume, unlock, ~lock_awaiter).
81 : - The stop callback may fire from any thread, but only touches
82 : claimed_ (atomic) and then calls dispatch. It never touches the
83 : list.
84 : - ~lock_awaiter must be called from the executor thread. This is
85 : guaranteed during normal shutdown but NOT if the coroutine frame
86 : is destroyed from another thread while a stop callback could
87 : fire (precondition violation, same as cppcoro/folly).
88 : */
89 :
90 : namespace boost {
91 : namespace capy {
92 :
93 : /** An asynchronous mutex for coroutines.
94 :
95 : This mutex provides mutual exclusion for coroutines without blocking.
96 : When a coroutine attempts to acquire a locked mutex, it suspends and
97 : is added to an intrusive wait queue. When the holder unlocks, the next
98 : waiter is resumed with the lock held.
99 :
100 : @par Cancellation
101 :
102 : When a coroutine is suspended waiting for the mutex and its stop
103 : token is triggered, the waiter completes with `error::canceled`
104 : instead of acquiring the lock.
105 :
106 : Cancellation only applies while the coroutine is suspended in the
107 : wait queue. If the mutex is unlocked when `lock()` is called, the
108 : lock is acquired immediately even if the stop token is already
109 : signaled.
110 :
111 : @par Zero Allocation
112 :
113 : No heap allocation occurs for lock operations.
114 :
115 : @par Thread Safety
116 :
117 : The mutex operations are designed for single-threaded use on one
118 : executor. The stop callback may fire from any thread.
119 :
120 : @par Example
121 : @code
122 : async_mutex cm;
123 :
124 : task<> protected_operation() {
125 : auto [ec] = co_await cm.lock();
126 : if(ec)
127 : co_return;
128 : // ... critical section ...
129 : cm.unlock();
130 : }
131 :
132 : // Or with RAII:
133 : task<> protected_operation() {
134 : auto [ec, guard] = co_await cm.scoped_lock();
135 : if(ec)
136 : co_return;
137 : // ... critical section ...
138 : // unlocks automatically
139 : }
140 : @endcode
141 : */
142 : class async_mutex
143 : {
144 : public:
145 : class lock_awaiter;
146 : class lock_guard;
147 : class lock_guard_awaiter;
148 :
149 : private:
150 : bool locked_ = false;
151 : detail::intrusive_list<lock_awaiter> waiters_;
152 :
153 : public:
154 : /** Awaiter returned by lock().
155 : */
156 : class lock_awaiter
157 : : public detail::intrusive_list<lock_awaiter>::node
158 : {
159 : friend class async_mutex;
160 :
161 : async_mutex* m_;
162 : std::coroutine_handle<> h_;
163 : executor_ref ex_;
164 :
165 : // These members must be declared before stop_cb_
166 : // (see comment on the union below).
167 : std::atomic<bool> claimed_{false};
168 : bool canceled_ = false;
169 : bool active_ = false;
170 :
171 : struct cancel_fn
172 : {
173 : lock_awaiter* self_;
174 :
175 5 : void operator()() const noexcept
176 : {
177 5 : if(!self_->claimed_.exchange(
178 : true, std::memory_order_acq_rel))
179 : {
180 5 : self_->canceled_ = true;
181 5 : self_->ex_.dispatch(self_->h_);
182 : }
183 5 : }
184 : };
185 :
186 : using stop_cb_t =
187 : std::stop_callback<cancel_fn>;
188 :
189 : // Aligned storage for stop_cb_t. Declared last:
190 : // its destructor may block while the callback
191 : // accesses the members above.
192 : #ifdef _MSC_VER
193 : # pragma warning(push)
194 : # pragma warning(disable: 4324) // padded due to alignas
195 : #endif
196 : alignas(stop_cb_t)
197 : unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
198 : #ifdef _MSC_VER
199 : # pragma warning(pop)
200 : #endif
201 :
202 15 : stop_cb_t& stop_cb_() noexcept
203 : {
204 : return *reinterpret_cast<stop_cb_t*>(
205 15 : stop_cb_buf_);
206 : }
207 :
208 : public:
209 62 : ~lock_awaiter()
210 : {
211 62 : if(active_)
212 : {
213 3 : stop_cb_().~stop_cb_t();
214 3 : m_->waiters_.remove(this);
215 : }
216 62 : }
217 :
218 31 : explicit lock_awaiter(async_mutex* m) noexcept
219 31 : : m_(m)
220 : {
221 31 : }
222 :
223 31 : lock_awaiter(lock_awaiter&& o) noexcept
224 31 : : m_(o.m_)
225 31 : , h_(o.h_)
226 31 : , ex_(o.ex_)
227 31 : , claimed_(o.claimed_.load(
228 : std::memory_order_relaxed))
229 31 : , canceled_(o.canceled_)
230 31 : , active_(std::exchange(o.active_, false))
231 : {
232 31 : }
233 :
234 : lock_awaiter(lock_awaiter const&) = delete;
235 : lock_awaiter& operator=(lock_awaiter const&) = delete;
236 : lock_awaiter& operator=(lock_awaiter&&) = delete;
237 :
238 31 : bool await_ready() const noexcept
239 : {
240 31 : if(!m_->locked_)
241 : {
242 14 : m_->locked_ = true;
243 14 : return true;
244 : }
245 17 : return false;
246 : }
247 :
248 : /** IoAwaitable protocol overload. */
249 : std::coroutine_handle<>
250 17 : await_suspend(
251 : std::coroutine_handle<> h,
252 : io_env const* env) noexcept
253 : {
254 17 : if(env->stop_token.stop_requested())
255 : {
256 2 : canceled_ = true;
257 2 : return h;
258 : }
259 15 : h_ = h;
260 15 : ex_ = env->executor;
261 15 : m_->waiters_.push_back(this);
262 45 : ::new(stop_cb_buf_) stop_cb_t(
263 15 : env->stop_token, cancel_fn{this});
264 15 : active_ = true;
265 15 : return std::noop_coroutine();
266 : }
267 :
268 28 : io_result<> await_resume() noexcept
269 : {
270 28 : if(active_)
271 : {
272 12 : stop_cb_().~stop_cb_t();
273 12 : if(canceled_)
274 : {
275 5 : m_->waiters_.remove(this);
276 5 : active_ = false;
277 5 : return {make_error_code(
278 5 : error::canceled)};
279 : }
280 7 : active_ = false;
281 : }
282 23 : if(canceled_)
283 2 : return {make_error_code(
284 2 : error::canceled)};
285 21 : return {{}};
286 : }
287 : };
288 :
289 : /** RAII lock guard for async_mutex.
290 :
291 : Automatically unlocks the mutex when destroyed.
292 : */
293 : class [[nodiscard]] lock_guard
294 : {
295 : async_mutex* m_;
296 :
297 : public:
298 5 : ~lock_guard()
299 : {
300 5 : if(m_)
301 2 : m_->unlock();
302 5 : }
303 :
304 2 : lock_guard() noexcept
305 2 : : m_(nullptr)
306 : {
307 2 : }
308 :
309 2 : explicit lock_guard(async_mutex* m) noexcept
310 2 : : m_(m)
311 : {
312 2 : }
313 :
314 1 : lock_guard(lock_guard&& o) noexcept
315 1 : : m_(std::exchange(o.m_, nullptr))
316 : {
317 1 : }
318 :
319 : lock_guard& operator=(lock_guard&& o) noexcept
320 : {
321 : if(this != &o)
322 : {
323 : if(m_)
324 : m_->unlock();
325 : m_ = std::exchange(o.m_, nullptr);
326 : }
327 : return *this;
328 : }
329 :
330 : lock_guard(lock_guard const&) = delete;
331 : lock_guard& operator=(lock_guard const&) = delete;
332 : };
333 :
334 : /** Awaiter returned by scoped_lock() that returns a lock_guard on resume.
335 : */
336 : class lock_guard_awaiter
337 : {
338 : async_mutex* m_;
339 : lock_awaiter inner_;
340 :
341 : public:
342 4 : explicit lock_guard_awaiter(async_mutex* m) noexcept
343 4 : : m_(m)
344 4 : , inner_(m)
345 : {
346 4 : }
347 :
348 4 : bool await_ready() const noexcept
349 : {
350 4 : return inner_.await_ready();
351 : }
352 :
353 : /** IoAwaitable protocol overload. */
354 : std::coroutine_handle<>
355 2 : await_suspend(
356 : std::coroutine_handle<> h,
357 : io_env const* env) noexcept
358 : {
359 2 : return inner_.await_suspend(h, env);
360 : }
361 :
362 4 : io_result<lock_guard> await_resume() noexcept
363 : {
364 4 : auto r = inner_.await_resume();
365 4 : if(r.ec)
366 2 : return {r.ec, {}};
367 2 : return {{}, lock_guard(m_)};
368 4 : }
369 : };
370 :
371 : async_mutex() = default;
372 :
373 : // Non-copyable, non-movable
374 : async_mutex(async_mutex const&) = delete;
375 : async_mutex& operator=(async_mutex const&) = delete;
376 :
377 : /** Returns an awaiter that acquires the mutex.
378 :
379 : @return An awaitable yielding `(error_code)`.
380 : */
381 27 : lock_awaiter lock() noexcept
382 : {
383 27 : return lock_awaiter{this};
384 : }
385 :
386 : /** Returns an awaiter that acquires the mutex with RAII.
387 :
388 : @return An awaitable yielding `(error_code,lock_guard)`.
389 : */
390 4 : lock_guard_awaiter scoped_lock() noexcept
391 : {
392 4 : return lock_guard_awaiter(this);
393 : }
394 :
395 : /** Releases the mutex.
396 :
397 : If waiters are queued, the next eligible waiter is
398 : resumed with the lock held. Canceled waiters are
399 : skipped. If no eligible waiter remains, the mutex
400 : becomes unlocked.
401 : */
402 21 : void unlock() noexcept
403 : {
404 : for(;;)
405 : {
406 21 : auto* waiter = waiters_.pop_front();
407 21 : if(!waiter)
408 : {
409 14 : locked_ = false;
410 14 : return;
411 : }
412 7 : if(!waiter->claimed_.exchange(
413 : true, std::memory_order_acq_rel))
414 : {
415 7 : waiter->ex_.dispatch(waiter->h_);
416 7 : return;
417 : }
418 0 : }
419 : }
420 :
421 : /** Returns true if the mutex is currently locked.
422 : */
423 22 : bool is_locked() const noexcept
424 : {
425 22 : return locked_;
426 : }
427 : };
428 :
429 : } // namespace capy
430 : } // namespace boost
431 :
432 : #endif
|