libs/capy/include/boost/capy/ex/async_event.hpp

100.0% Lines (68/68) 100.0% Functions (13/13) 94.4% Branches (17/18)
libs/capy/include/boost/capy/ex/async_event.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_ASYNC_EVENT_HPP
11 #define BOOST_CAPY_ASYNC_EVENT_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/intrusive.hpp>
15 #include <boost/capy/concept/executor.hpp>
16 #include <boost/capy/error.hpp>
17 #include <boost/capy/ex/io_env.hpp>
18 #include <boost/capy/io_result.hpp>
19
20 #include <stop_token>
21
22 #include <atomic>
23 #include <coroutine>
24 #include <new>
25 #include <utility>
26
27 /* async_event implementation notes
28 =================================
29
30 Same cancellation pattern as async_mutex (see that file for the
31 full discussion on claimed_, stop_cb lifetime, member ordering,
32 and threading assumptions).
33
34 Key difference: set() wakes ALL waiters (broadcast), not one.
35 It pops every waiter from the list and dispatches the ones it
36 claims. Waiters already claimed by a stop callback are skipped.
37
38 Because set() pops all waiters, a canceled waiter may have been
39 removed from the list by set() before its await_resume runs.
40 This requires a separate in_list_ flag (unlike async_mutex where
41 active_ served double duty). await_resume only calls remove()
42 when in_list_ is true.
43 */
44
45 namespace boost {
46 namespace capy {
47
48 /** An asynchronous event for coroutines.
49
50 This event provides a way to notify multiple coroutines that some
51 condition has occurred. When a coroutine awaits an unset event, it
52 suspends and is added to a wait queue. When the event is set, all
53 waiting coroutines are resumed.
54
55 @par Cancellation
56
57 When a coroutine is suspended waiting for the event and its stop
58 token is triggered, the waiter completes with `error::canceled`
59 instead of waiting for `set()`.
60
61 Cancellation only applies while the coroutine is suspended in the
62 wait queue. If the event is already set when `wait()` is called,
63 the wait completes immediately even if the stop token is already
64 signaled.
65
66 @par Zero Allocation
67
68 No heap allocation occurs for wait operations.
69
70 @par Thread Safety
71
72 The event operations are designed for single-threaded use on one
73 executor. The stop callback may fire from any thread.
74
75 @par Example
76 @code
77 async_event event;
78
79 task<> waiter() {
80 auto [ec] = co_await event.wait();
81 if(ec)
82 co_return;
83 // ... event was set ...
84 }
85
86 task<> notifier() {
87 // ... do some work ...
88 event.set(); // Wake all waiters
89 }
90 @endcode
91 */
92 class async_event
93 {
94 public:
95 class wait_awaiter;
96
97 private:
98 bool set_ = false;
99 detail::intrusive_list<wait_awaiter> waiters_;
100
101 public:
102 /** Awaiter returned by wait().
103 */
104 class wait_awaiter
105 : public detail::intrusive_list<wait_awaiter>::node
106 {
107 friend class async_event;
108
109 async_event* e_;
110 std::coroutine_handle<> h_;
111 executor_ref ex_;
112
113 // Declared before stop_cb_buf_: the callback
114 // accesses these members, so they must still be
115 // alive if the stop_cb_ destructor blocks.
116 std::atomic<bool> claimed_{false};
117 bool canceled_ = false;
118 bool active_ = false;
119 bool in_list_ = false;
120
121 struct cancel_fn
122 {
123 wait_awaiter* self_;
124
125 20 void operator()() const noexcept
126 {
127
2/2
✓ Branch 1 taken 19 times.
✓ Branch 2 taken 1 time.
20 if(!self_->claimed_.exchange(
128 true, std::memory_order_acq_rel))
129 {
130 19 self_->canceled_ = true;
131 19 self_->ex_.dispatch(self_->h_);
132 }
133 20 }
134 };
135
136 using stop_cb_t =
137 std::stop_callback<cancel_fn>;
138
139 // Aligned storage for stop_cb_t. Declared last:
140 // its destructor may block while the callback
141 // accesses the members above.
142 #ifdef _MSC_VER
143 # pragma warning(push)
144 # pragma warning(disable: 4324) // padded due to alignas
145 #endif
146 alignas(stop_cb_t)
147 unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
148 #ifdef _MSC_VER
149 # pragma warning(pop)
150 #endif
151
152 31 stop_cb_t& stop_cb_() noexcept
153 {
154 return *reinterpret_cast<stop_cb_t*>(
155 31 stop_cb_buf_);
156 }
157
158 public:
159 235 ~wait_awaiter()
160 {
161
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 234 times.
235 if(active_)
162 1 stop_cb_().~stop_cb_t();
163
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 234 times.
235 if(in_list_)
164 1 e_->waiters_.remove(this);
165 235 }
166
167 51 explicit wait_awaiter(async_event* e) noexcept
168 51 : e_(e)
169 {
170 51 }
171
172 184 wait_awaiter(wait_awaiter&& o) noexcept
173 184 : e_(o.e_)
174 184 , h_(o.h_)
175 184 , ex_(o.ex_)
176 184 , claimed_(o.claimed_.load(
177 std::memory_order_relaxed))
178 184 , canceled_(o.canceled_)
179 184 , active_(std::exchange(o.active_, false))
180 184 , in_list_(std::exchange(o.in_list_, false))
181 {
182 184 }
183
184 wait_awaiter(wait_awaiter const&) = delete;
185 wait_awaiter& operator=(wait_awaiter const&) = delete;
186 wait_awaiter& operator=(wait_awaiter&&) = delete;
187
188 51 bool await_ready() const noexcept
189 {
190 51 return e_->set_;
191 }
192
193 /** IoAwaitable protocol overload. */
194 std::coroutine_handle<>
195 41 await_suspend(
196 std::coroutine_handle<> h,
197 io_env const* env) noexcept
198 {
199
2/2
✓ Branch 1 taken 10 times.
✓ Branch 2 taken 31 times.
41 if(env->stop_token.stop_requested())
200 {
201 10 canceled_ = true;
202 10 return h;
203 }
204 31 h_ = h;
205 31 ex_ = env->executor;
206 31 e_->waiters_.push_back(this);
207 31 in_list_ = true;
208 93 ::new(stop_cb_buf_) stop_cb_t(
209 31 env->stop_token, cancel_fn{this});
210 31 active_ = true;
211 31 return std::noop_coroutine();
212 }
213
214 48 io_result<> await_resume() noexcept
215 {
216
2/2
✓ Branch 0 taken 30 times.
✓ Branch 1 taken 18 times.
48 if(active_)
217 {
218 30 stop_cb_().~stop_cb_t();
219 30 active_ = false;
220 }
221
2/2
✓ Branch 0 taken 29 times.
✓ Branch 1 taken 19 times.
48 if(canceled_)
222 {
223
2/2
✓ Branch 0 taken 19 times.
✓ Branch 1 taken 10 times.
29 if(in_list_)
224 {
225 19 e_->waiters_.remove(this);
226 19 in_list_ = false;
227 }
228 29 return {make_error_code(
229 29 error::canceled)};
230 }
231 19 return {{}};
232 }
233 };
234
235 20 async_event() = default;
236
237 // Non-copyable, non-movable
238 async_event(async_event const&) = delete;
239 async_event& operator=(async_event const&) = delete;
240
241 /** Returns an awaiter that waits until the event is set.
242
243 If the event is already set, completes immediately.
244
245 @return An awaitable yielding `(error_code)`.
246 */
247 51 wait_awaiter wait() noexcept
248 {
249 51 return wait_awaiter{this};
250 }
251
252 /** Sets the event.
253
254 All waiting coroutines are resumed. Canceled waiters
255 are skipped. Subsequent calls to wait() complete
256 immediately until clear() is called.
257 */
258 20 void set() noexcept
259 {
260 20 set_ = true;
261 for(;;)
262 {
263 31 auto* w = waiters_.pop_front();
264
2/2
✓ Branch 0 taken 20 times.
✓ Branch 1 taken 11 times.
31 if(!w)
265 20 break;
266 11 w->in_list_ = false;
267
1/2
✓ Branch 1 taken 11 times.
✗ Branch 2 not taken.
11 if(!w->claimed_.exchange(
268 true, std::memory_order_acq_rel))
269 {
270 11 w->ex_.dispatch(w->h_);
271 }
272 11 }
273 20 }
274
275 /** Clears the event.
276
277 Subsequent calls to wait() will suspend until
278 set() is called again.
279 */
280 2 void clear() noexcept
281 {
282 2 set_ = false;
283 2 }
284
285 /** Returns true if the event is currently set.
286 */
287 8 bool is_set() const noexcept
288 {
289 8 return set_;
290 }
291 };
292
293 } // namespace capy
294 } // namespace boost
295
296 #endif
297