include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp

85.3% Lines (128/150) 100.0% List of functions (10/10)
epoll_scheduler.hpp
f(x) Functions (10)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19
20 #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21
22 #include <boost/corosio/native/detail/epoll/epoll_traits.hpp>
23 #include <boost/corosio/detail/timer_service.hpp>
24 #include <boost/corosio/native/detail/make_err.hpp>
25 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27 #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
28 #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
29
30 #include <boost/corosio/detail/except.hpp>
31
32 #include <atomic>
33 #include <chrono>
34 #include <cstdint>
35 #include <mutex>
36 #include <vector>
37
38 #include <errno.h>
39 #include <sys/epoll.h>
40 #include <sys/eventfd.h>
41 #include <sys/timerfd.h>
42 #include <unistd.h>
43
44 namespace boost::corosio::detail {
45
46 /** Linux scheduler using epoll for I/O multiplexing.
47
48 This scheduler implements the scheduler interface using Linux epoll
49 for efficient I/O event notification. It uses a single reactor model
50 where one thread runs epoll_wait while other threads
51 wait on a condition variable for handler work. This design provides:
52
53 - Handler parallelism: N posted handlers can execute on N threads
54 - No thundering herd: condition_variable wakes exactly one thread
55 - IOCP parity: Behavior matches Windows I/O completion port semantics
56
57 When threads call run(), they first try to execute queued handlers.
58 If the queue is empty and no reactor is running, one thread becomes
59 the reactor and runs epoll_wait. Other threads wait on a condition
60 variable until handlers are available.
61
62 @par Thread Safety
63 All public member functions are thread-safe.
64 */
65 class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler
66 {
67 public:
68 /** Construct the scheduler.
69
70 Creates an epoll instance, eventfd for reactor interruption,
71 and timerfd for kernel-managed timer expiry.
72
73 @param ctx Reference to the owning execution_context.
74 @param concurrency_hint Hint for expected thread count (unused).
75 */
76 epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
77
78 /// Destroy the scheduler.
79 ~epoll_scheduler() override;
80
81 epoll_scheduler(epoll_scheduler const&) = delete;
82 epoll_scheduler& operator=(epoll_scheduler const&) = delete;
83
84 /// Shut down the scheduler, draining pending operations.
85 void shutdown() override;
86
87 /// Apply runtime configuration, resizing the event buffer.
88 void configure_reactor(
89 unsigned max_events,
90 unsigned budget_init,
91 unsigned budget_max,
92 unsigned unassisted) override;
93
94 /** Return the epoll file descriptor.
95
96 Used by socket services to register file descriptors
97 for I/O event notification.
98
99 @return The epoll file descriptor.
100 */
101 int epoll_fd() const noexcept
102 {
103 return epoll_fd_;
104 }
105
106 /** Register a descriptor for persistent monitoring.
107
108 The fd is registered once and stays registered until explicitly
109 deregistered. Events are dispatched via reactor_descriptor_state which
110 tracks pending read/write/connect operations.
111
112 @param fd The file descriptor to register.
113 @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
114 */
115 void register_descriptor(int fd, reactor_descriptor_state* desc) const;
116
117 /** Deregister a persistently registered descriptor.
118
119 @param fd The file descriptor to deregister.
120 */
121 void deregister_descriptor(int fd) const;
122
123 private:
124 void
125 run_task(lock_type& lock, context_type* ctx,
126 long timeout_us) override;
127 void interrupt_reactor() const override;
128 void update_timerfd() const;
129
130 int epoll_fd_;
131 int event_fd_;
132 int timer_fd_;
133
134 // Edge-triggered eventfd state
135 mutable std::atomic<bool> eventfd_armed_{false};
136
137 // Set when the earliest timer changes; flushed before epoll_wait
138 mutable std::atomic<bool> timerfd_stale_{false};
139
140 // Event buffer sized from max_events_per_poll_ (set at construction,
141 // resized by configure_reactor via io_context_options).
142 std::vector<epoll_event> event_buffer_;
143 };
144
145 595x inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
146 595x : epoll_fd_(-1)
147 595x , event_fd_(-1)
148 595x , timer_fd_(-1)
149 1190x , event_buffer_(max_events_per_poll_)
150 {
151 595x epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
152 595x if (epoll_fd_ < 0)
153 detail::throw_system_error(make_err(errno), "epoll_create1");
154
155 595x event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
156 595x if (event_fd_ < 0)
157 {
158 int errn = errno;
159 ::close(epoll_fd_);
160 detail::throw_system_error(make_err(errn), "eventfd");
161 }
162
163 595x timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
164 595x if (timer_fd_ < 0)
165 {
166 int errn = errno;
167 ::close(event_fd_);
168 ::close(epoll_fd_);
169 detail::throw_system_error(make_err(errn), "timerfd_create");
170 }
171
172 595x epoll_event ev{};
173 595x ev.events = EPOLLIN | EPOLLET;
174 595x ev.data.ptr = nullptr;
175 595x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
176 {
177 int errn = errno;
178 ::close(timer_fd_);
179 ::close(event_fd_);
180 ::close(epoll_fd_);
181 detail::throw_system_error(make_err(errn), "epoll_ctl");
182 }
183
184 595x epoll_event timer_ev{};
185 595x timer_ev.events = EPOLLIN | EPOLLERR;
186 595x timer_ev.data.ptr = &timer_fd_;
187 595x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
188 {
189 int errn = errno;
190 ::close(timer_fd_);
191 ::close(event_fd_);
192 ::close(epoll_fd_);
193 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
194 }
195
196 595x timer_svc_ = &get_timer_service(ctx, *this);
197 595x timer_svc_->set_on_earliest_changed(
198 4797x timer_service::callback(this, [](void* p) {
199 4202x auto* self = static_cast<epoll_scheduler*>(p);
200 4202x self->timerfd_stale_.store(true, std::memory_order_release);
201 4202x self->interrupt_reactor();
202 4202x }));
203
204 595x get_resolver_service(ctx, *this);
205 595x get_signal_service(ctx, *this);
206 595x get_stream_file_service(ctx, *this);
207 595x get_random_access_file_service(ctx, *this);
208
209 595x completed_ops_.push(&task_op_);
210 595x }
211
212 1190x inline epoll_scheduler::~epoll_scheduler()
213 {
214 595x if (timer_fd_ >= 0)
215 595x ::close(timer_fd_);
216 595x if (event_fd_ >= 0)
217 595x ::close(event_fd_);
218 595x if (epoll_fd_ >= 0)
219 595x ::close(epoll_fd_);
220 1190x }
221
222 inline void
223 595x epoll_scheduler::shutdown()
224 {
225 595x shutdown_drain();
226
227 595x if (event_fd_ >= 0)
228 595x interrupt_reactor();
229 595x }
230
231 inline void
232 8x epoll_scheduler::configure_reactor(
233 unsigned max_events,
234 unsigned budget_init,
235 unsigned budget_max,
236 unsigned unassisted)
237 {
238 8x reactor_scheduler::configure_reactor(
239 max_events, budget_init, budget_max, unassisted);
240 7x event_buffer_.resize(max_events_per_poll_);
241 7x }
242
243 inline void
244 8274x epoll_scheduler::register_descriptor(int fd, reactor_descriptor_state* desc) const
245 {
246 8274x epoll_event ev{};
247 8274x ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
248 8274x ev.data.ptr = desc;
249
250 8274x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
251 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
252
253 8274x desc->registered_events = ev.events;
254 8274x desc->fd = fd;
255 8274x desc->scheduler_ = this;
256 8274x desc->mutex.set_enabled(!single_threaded_);
257 8274x desc->ready_events_.store(0, std::memory_order_relaxed);
258
259 8274x conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
260 8274x desc->impl_ref_.reset();
261 8274x desc->read_ready = false;
262 8274x desc->write_ready = false;
263 8274x }
264
265 inline void
266 8274x epoll_scheduler::deregister_descriptor(int fd) const
267 {
268 8274x ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
269 8274x }
270
271 inline void
272 5257x epoll_scheduler::interrupt_reactor() const
273 {
274 5257x bool expected = false;
275 5257x if (eventfd_armed_.compare_exchange_strong(
276 expected, true, std::memory_order_release,
277 std::memory_order_relaxed))
278 {
279 4902x std::uint64_t val = 1;
280 4902x [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
281 }
282 5257x }
283
284 inline void
285 8361x epoll_scheduler::update_timerfd() const
286 {
287 8361x auto nearest = timer_svc_->nearest_expiry();
288
289 8361x itimerspec ts{};
290 8361x int flags = 0;
291
292 8361x if (nearest == timer_service::time_point::max())
293 {
294 // No timers — disarm by setting to 0 (relative)
295 }
296 else
297 {
298 8264x auto now = std::chrono::steady_clock::now();
299 8264x if (nearest <= now)
300 {
301 // Use 1ns instead of 0 — zero disarms the timerfd
302 506x ts.it_value.tv_nsec = 1;
303 }
304 else
305 {
306 7758x auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
307 7758x nearest - now)
308 7758x .count();
309 7758x ts.it_value.tv_sec = nsec / 1000000000;
310 7758x ts.it_value.tv_nsec = nsec % 1000000000;
311 7758x if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
312 ts.it_value.tv_nsec = 1;
313 }
314 }
315
316 8361x if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
317 detail::throw_system_error(make_err(errno), "timerfd_settime");
318 8361x }
319
320 inline void
321 41121x epoll_scheduler::run_task(
322 lock_type& lock, context_type* ctx, long timeout_us)
323 {
324 int timeout_ms;
325 41121x if (task_interrupted_)
326 29566x timeout_ms = 0;
327 11555x else if (timeout_us < 0)
328 11551x timeout_ms = -1;
329 else
330 4x timeout_ms = static_cast<int>((timeout_us + 999) / 1000);
331
332 41121x if (lock.owns_lock())
333 11555x lock.unlock();
334
335 41121x task_cleanup on_exit{this, &lock, ctx};
336
337 // Flush deferred timerfd programming before blocking
338 41121x if (timerfd_stale_.exchange(false, std::memory_order_acquire))
339 4178x update_timerfd();
340
341 41121x int nfds = ::epoll_wait(
342 epoll_fd_, event_buffer_.data(),
343 41121x static_cast<int>(event_buffer_.size()), timeout_ms);
344
345 41121x if (nfds < 0 && errno != EINTR)
346 detail::throw_system_error(make_err(errno), "epoll_wait");
347
348 41121x bool check_timers = false;
349 41121x op_queue local_ops;
350
351 93443x for (int i = 0; i < nfds; ++i)
352 {
353 52322x if (event_buffer_[i].data.ptr == nullptr)
354 {
355 std::uint64_t val;
356 // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
357 4307x [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
358 4307x eventfd_armed_.store(false, std::memory_order_relaxed);
359 4307x continue;
360 4307x }
361
362 48015x if (event_buffer_[i].data.ptr == &timer_fd_)
363 {
364 std::uint64_t expirations;
365 // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
366 [[maybe_unused]] auto r =
367 4183x ::read(timer_fd_, &expirations, sizeof(expirations));
368 4183x check_timers = true;
369 4183x continue;
370 4183x }
371
372 auto* desc =
373 43832x static_cast<reactor_descriptor_state*>(event_buffer_[i].data.ptr);
374 43832x desc->add_ready_events(event_buffer_[i].events);
375
376 43832x bool expected = false;
377 43832x if (desc->is_enqueued_.compare_exchange_strong(
378 expected, true, std::memory_order_release,
379 std::memory_order_relaxed))
380 {
381 43832x local_ops.push(desc);
382 }
383 }
384
385 41121x if (check_timers)
386 {
387 4183x timer_svc_->process_expired();
388 4183x update_timerfd();
389 }
390
391 41121x lock.lock();
392
393 41121x if (!local_ops.empty())
394 28988x completed_ops_.splice(local_ops);
395 41121x }
396
397 } // namespace boost::corosio::detail
398
399 #endif // BOOST_COROSIO_HAS_EPOLL
400
401 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
402