libs/capy/include/boost/capy/ex/async_mutex.hpp

98.9% Lines (93/94) 100.0% Functions (20/20) 90.9% Branches (20/22)
libs/capy/include/boost/capy/ex/async_mutex.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_ASYNC_MUTEX_HPP
11 #define BOOST_CAPY_ASYNC_MUTEX_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/intrusive.hpp>
15 #include <boost/capy/concept/executor.hpp>
16 #include <boost/capy/error.hpp>
17 #include <boost/capy/ex/io_env.hpp>
18 #include <boost/capy/io_result.hpp>
19
20 #include <stop_token>
21
22 #include <atomic>
23 #include <coroutine>
24 #include <new>
25 #include <utility>
26
27 /* async_mutex implementation notes
28 ================================
29
30 Waiters form a doubly-linked intrusive list (fair FIFO). lock_awaiter
31 inherits intrusive_list<lock_awaiter>::node; the list is owned by
32 async_mutex::waiters_.
33
34 Cancellation via stop_token
35 ---------------------------
36 A std::stop_callback is registered in await_suspend. Two actors can
37 race to resume the suspended coroutine: unlock() and the stop callback.
38 An atomic bool `claimed_` resolves the race -- whoever does
39 claimed_.exchange(true) and reads false wins. The loser does nothing.
40
41 The stop callback calls ex_.dispatch(h_). If dispatch runs inline
42 (same thread), the stop_callback is destroyed from within its own
43 operator() via await_resume. This is safe: cancel_fn touches no
44 members after dispatch returns (same pattern as delete-this).
45
46 unlock() pops waiters from the front. If the popped waiter was
47 already claimed by the stop callback, unlock() skips it and tries
48 the next. await_resume removes the (still-linked) canceled waiter
49 via waiters_.remove(this).
50
51 The stop_callback lives in a union to suppress automatic
52 construction/destruction. Placement new in await_suspend, explicit
53 destructor call in await_resume and ~lock_awaiter.
54
55 Member ordering constraint
56 --------------------------
57 The union containing stop_cb_ must be declared AFTER the members
58 the callback accesses (h_, ex_, claimed_, canceled_). If the
59 stop_cb_ destructor blocks waiting for a concurrent callback, those
60 members must still be alive (C++ destroys in reverse declaration
61 order).
62
63 active_ flag
64 ------------
65 Tracks both list membership and stop_cb_ lifetime (they are always
66 set and cleared together). Used by the destructor to clean up if the
67 coroutine is destroyed while suspended (e.g. execution_context
68 shutdown).
69
70 Cancellation scope
71 ------------------
72 Cancellation only takes effect while the coroutine is suspended in
73 the wait queue. If the mutex is unlocked, await_ready acquires it
74 immediately without checking the stop token. This is intentional:
75 the fast path has no token access and no overhead.
76
77 Threading assumptions
78 ---------------------
79 - All list mutations happen on the executor thread (await_suspend,
80 await_resume, unlock, ~lock_awaiter).
81 - The stop callback may fire from any thread, but only touches
82 claimed_ (atomic) and then calls dispatch. It never touches the
83 list.
84 - ~lock_awaiter must be called from the executor thread. This is
85 guaranteed during normal shutdown but NOT if the coroutine frame
86 is destroyed from another thread while a stop callback could
87 fire (precondition violation, same as cppcoro/folly).
88 */
89
90 namespace boost {
91 namespace capy {
92
93 /** An asynchronous mutex for coroutines.
94
95 This mutex provides mutual exclusion for coroutines without blocking.
96 When a coroutine attempts to acquire a locked mutex, it suspends and
97 is added to an intrusive wait queue. When the holder unlocks, the next
98 waiter is resumed with the lock held.
99
100 @par Cancellation
101
102 When a coroutine is suspended waiting for the mutex and its stop
103 token is triggered, the waiter completes with `error::canceled`
104 instead of acquiring the lock.
105
106 Cancellation only applies while the coroutine is suspended in the
107 wait queue. If the mutex is unlocked when `lock()` is called, the
108 lock is acquired immediately even if the stop token is already
109 signaled.
110
111 @par Zero Allocation
112
113 No heap allocation occurs for lock operations.
114
115 @par Thread Safety
116
117 The mutex operations are designed for single-threaded use on one
118 executor. The stop callback may fire from any thread.
119
120 @par Example
121 @code
122 async_mutex cm;
123
124 task<> protected_operation() {
125 auto [ec] = co_await cm.lock();
126 if(ec)
127 co_return;
128 // ... critical section ...
129 cm.unlock();
130 }
131
132 // Or with RAII:
133 task<> protected_operation() {
134 auto [ec, guard] = co_await cm.scoped_lock();
135 if(ec)
136 co_return;
137 // ... critical section ...
138 // unlocks automatically
139 }
140 @endcode
141 */
142 class async_mutex
143 {
144 public:
145 class lock_awaiter;
146 class lock_guard;
147 class lock_guard_awaiter;
148
149 private:
150 bool locked_ = false;
151 detail::intrusive_list<lock_awaiter> waiters_;
152
153 public:
154 /** Awaiter returned by lock().
155 */
156 class lock_awaiter
157 : public detail::intrusive_list<lock_awaiter>::node
158 {
159 friend class async_mutex;
160
161 async_mutex* m_;
162 std::coroutine_handle<> h_;
163 executor_ref ex_;
164
165 // These members must be declared before stop_cb_
166 // (see comment on the union below).
167 std::atomic<bool> claimed_{false};
168 bool canceled_ = false;
169 bool active_ = false;
170
171 struct cancel_fn
172 {
173 lock_awaiter* self_;
174
175 5 void operator()() const noexcept
176 {
177
1/2
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
5 if(!self_->claimed_.exchange(
178 true, std::memory_order_acq_rel))
179 {
180 5 self_->canceled_ = true;
181 5 self_->ex_.dispatch(self_->h_);
182 }
183 5 }
184 };
185
186 using stop_cb_t =
187 std::stop_callback<cancel_fn>;
188
189 // Aligned storage for stop_cb_t. Declared last:
190 // its destructor may block while the callback
191 // accesses the members above.
192 #ifdef _MSC_VER
193 # pragma warning(push)
194 # pragma warning(disable: 4324) // padded due to alignas
195 #endif
196 alignas(stop_cb_t)
197 unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
198 #ifdef _MSC_VER
199 # pragma warning(pop)
200 #endif
201
202 15 stop_cb_t& stop_cb_() noexcept
203 {
204 return *reinterpret_cast<stop_cb_t*>(
205 15 stop_cb_buf_);
206 }
207
208 public:
209 62 ~lock_awaiter()
210 {
211
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 59 times.
62 if(active_)
212 {
213 3 stop_cb_().~stop_cb_t();
214 3 m_->waiters_.remove(this);
215 }
216 62 }
217
218 31 explicit lock_awaiter(async_mutex* m) noexcept
219 31 : m_(m)
220 {
221 31 }
222
223 31 lock_awaiter(lock_awaiter&& o) noexcept
224 31 : m_(o.m_)
225 31 , h_(o.h_)
226 31 , ex_(o.ex_)
227 31 , claimed_(o.claimed_.load(
228 std::memory_order_relaxed))
229 31 , canceled_(o.canceled_)
230 31 , active_(std::exchange(o.active_, false))
231 {
232 31 }
233
234 lock_awaiter(lock_awaiter const&) = delete;
235 lock_awaiter& operator=(lock_awaiter const&) = delete;
236 lock_awaiter& operator=(lock_awaiter&&) = delete;
237
238 31 bool await_ready() const noexcept
239 {
240
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 17 times.
31 if(!m_->locked_)
241 {
242 14 m_->locked_ = true;
243 14 return true;
244 }
245 17 return false;
246 }
247
248 /** IoAwaitable protocol overload. */
249 std::coroutine_handle<>
250 17 await_suspend(
251 std::coroutine_handle<> h,
252 io_env const* env) noexcept
253 {
254
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 15 times.
17 if(env->stop_token.stop_requested())
255 {
256 2 canceled_ = true;
257 2 return h;
258 }
259 15 h_ = h;
260 15 ex_ = env->executor;
261 15 m_->waiters_.push_back(this);
262 45 ::new(stop_cb_buf_) stop_cb_t(
263 15 env->stop_token, cancel_fn{this});
264 15 active_ = true;
265 15 return std::noop_coroutine();
266 }
267
268 28 io_result<> await_resume() noexcept
269 {
270
2/2
✓ Branch 0 taken 12 times.
✓ Branch 1 taken 16 times.
28 if(active_)
271 {
272 12 stop_cb_().~stop_cb_t();
273
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 7 times.
12 if(canceled_)
274 {
275 5 m_->waiters_.remove(this);
276 5 active_ = false;
277 5 return {make_error_code(
278 5 error::canceled)};
279 }
280 7 active_ = false;
281 }
282
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 21 times.
23 if(canceled_)
283 2 return {make_error_code(
284 2 error::canceled)};
285 21 return {{}};
286 }
287 };
288
289 /** RAII lock guard for async_mutex.
290
291 Automatically unlocks the mutex when destroyed.
292 */
293 class [[nodiscard]] lock_guard
294 {
295 async_mutex* m_;
296
297 public:
298 5 ~lock_guard()
299 {
300
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 3 times.
5 if(m_)
301 2 m_->unlock();
302 5 }
303
304 2 lock_guard() noexcept
305 2 : m_(nullptr)
306 {
307 2 }
308
309 2 explicit lock_guard(async_mutex* m) noexcept
310 2 : m_(m)
311 {
312 2 }
313
314 1 lock_guard(lock_guard&& o) noexcept
315 1 : m_(std::exchange(o.m_, nullptr))
316 {
317 1 }
318
319 lock_guard& operator=(lock_guard&& o) noexcept
320 {
321 if(this != &o)
322 {
323 if(m_)
324 m_->unlock();
325 m_ = std::exchange(o.m_, nullptr);
326 }
327 return *this;
328 }
329
330 lock_guard(lock_guard const&) = delete;
331 lock_guard& operator=(lock_guard const&) = delete;
332 };
333
334 /** Awaiter returned by scoped_lock() that returns a lock_guard on resume.
335 */
336 class lock_guard_awaiter
337 {
338 async_mutex* m_;
339 lock_awaiter inner_;
340
341 public:
342 4 explicit lock_guard_awaiter(async_mutex* m) noexcept
343 4 : m_(m)
344 4 , inner_(m)
345 {
346 4 }
347
348 4 bool await_ready() const noexcept
349 {
350 4 return inner_.await_ready();
351 }
352
353 /** IoAwaitable protocol overload. */
354 std::coroutine_handle<>
355 2 await_suspend(
356 std::coroutine_handle<> h,
357 io_env const* env) noexcept
358 {
359 2 return inner_.await_suspend(h, env);
360 }
361
362 4 io_result<lock_guard> await_resume() noexcept
363 {
364 4 auto r = inner_.await_resume();
365
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
4 if(r.ec)
366 2 return {r.ec, {}};
367 2 return {{}, lock_guard(m_)};
368 4 }
369 };
370
371 async_mutex() = default;
372
373 // Non-copyable, non-movable
374 async_mutex(async_mutex const&) = delete;
375 async_mutex& operator=(async_mutex const&) = delete;
376
377 /** Returns an awaiter that acquires the mutex.
378
379 @return An awaitable yielding `(error_code)`.
380 */
381 27 lock_awaiter lock() noexcept
382 {
383 27 return lock_awaiter{this};
384 }
385
386 /** Returns an awaiter that acquires the mutex with RAII.
387
388 @return An awaitable yielding `(error_code,lock_guard)`.
389 */
390 4 lock_guard_awaiter scoped_lock() noexcept
391 {
392 4 return lock_guard_awaiter(this);
393 }
394
395 /** Releases the mutex.
396
397 If waiters are queued, the next eligible waiter is
398 resumed with the lock held. Canceled waiters are
399 skipped. If no eligible waiter remains, the mutex
400 becomes unlocked.
401 */
402 21 void unlock() noexcept
403 {
404 for(;;)
405 {
406 21 auto* waiter = waiters_.pop_front();
407
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 7 times.
21 if(!waiter)
408 {
409 14 locked_ = false;
410 14 return;
411 }
412
1/2
✓ Branch 1 taken 7 times.
✗ Branch 2 not taken.
7 if(!waiter->claimed_.exchange(
413 true, std::memory_order_acq_rel))
414 {
415 7 waiter->ex_.dispatch(waiter->h_);
416 7 return;
417 }
418 }
419 }
420
421 /** Returns true if the mutex is currently locked.
422 */
423 22 bool is_locked() const noexcept
424 {
425 22 return locked_;
426 }
427 };
428
429 } // namespace capy
430 } // namespace boost
431
432 #endif
433