diff --git a/asio/include/asio/detail/config.hpp b/asio/include/asio/detail/config.hpp index 44171c4b24..1743ab4d90 100644 --- a/asio/include/asio/detail/config.hpp +++ b/asio/include/asio/detail/config.hpp @@ -892,6 +892,16 @@ # endif // !defined(ASIO_HAS_DEV_POLL) #endif // defined(__sun) +// QNX: ionotify. +#if defined(__QNXNTO__) +# if !defined(ASIO_HAS_IONOTIFY) +# if !defined(ASIO_DISABLE_IONOTIFY) +# define ASIO_HAS_IONOTIFY 1 +# define ASIO_DISABLE_EPOLL +# endif // !defined(ASIO_DISABLE_IONOTIFY) +# endif // !defined(ASIO_HAS_IONOTIFY) +#endif // defined(__QNXNTO__) + // Serial ports. #if !defined(ASIO_HAS_SERIAL_PORT) # if defined(ASIO_HAS_IOCP) \ diff --git a/asio/include/asio/detail/impl/ionotify_reactor.hpp b/asio/include/asio/detail/impl/ionotify_reactor.hpp new file mode 100644 index 0000000000..4a6a47c94f --- /dev/null +++ b/asio/include/asio/detail/impl/ionotify_reactor.hpp @@ -0,0 +1,111 @@ +// detail/impl/ionotify_reactor.hpp +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2021 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef ASIO_DETAIL_IMPL_IONOTIFY_REACTOR_HPP +#define ASIO_DETAIL_IMPL_IONOTIFY_REACTOR_HPP + +#if defined(_MSC_VER) && (_MSC_VER >= 1200) +#pragma once +#endif // defined(_MSC_VER) && (_MSC_VER >= 1200) + +#include "asio/detail/config.hpp" + +#if defined(ASIO_HAS_IONOTIFY) + +#include "asio/detail/scheduler.hpp" + +#include "asio/detail/push_options.hpp" + +namespace asio { +namespace detail { + +inline void ionotify_reactor::post_immediate_completion( + operation* op, bool is_continuation) const +{ + scheduler_.post_immediate_completion(op, is_continuation); +} + +template +void ionotify_reactor::add_timer_queue(timer_queue& queue) +{ + do_add_timer_queue(queue); +} + +// Remove a timer queue from the reactor. +template +void ionotify_reactor::remove_timer_queue(timer_queue& queue) +{ + do_remove_timer_queue(queue); +} + +template +void ionotify_reactor::schedule_timer(timer_queue& queue, + const typename Time_Traits::time_type& time, + typename timer_queue::per_timer_data& timer, wait_op* op) +{ + asio::detail::mutex::scoped_lock lock(mutex_); + + if (shutdown_) + { + scheduler_.post_immediate_completion(op, false); + return; + } + + bool earliest = queue.enqueue_timer(time, timer, op); + scheduler_.work_started(); + if (earliest) + interrupt(); +} + +template +std::size_t ionotify_reactor::cancel_timer(timer_queue& queue, + typename timer_queue::per_timer_data& timer, + std::size_t max_cancelled) +{ + asio::detail::mutex::scoped_lock lock(mutex_); + op_queue ops; + std::size_t n = queue.cancel_timer(timer, ops, max_cancelled); + lock.unlock(); + scheduler_.post_deferred_completions(ops); + return n; +} + +template +void ionotify_reactor::cancel_timer_by_key(timer_queue& queue, + typename timer_queue::per_timer_data* timer, + void* cancellation_key) +{ + mutex::scoped_lock lock(mutex_); + op_queue ops; + queue.cancel_timer_by_key(timer, ops, cancellation_key); + lock.unlock(); + scheduler_.post_deferred_completions(ops); +} + +template +void ionotify_reactor::move_timer(timer_queue& queue, + typename timer_queue::per_timer_data& target, + typename timer_queue::per_timer_data& source) +{ + asio::detail::mutex::scoped_lock lock(mutex_); + op_queue ops; + queue.cancel_timer(target, ops); + queue.move_timer(target, source); + lock.unlock(); + scheduler_.post_deferred_completions(ops); +} + +} // namespace detail +} // namespace asio + +#include "asio/detail/pop_options.hpp" + +#endif // defined(ASIO_HAS_IONOTIFY) + +#endif // ASIO_DETAIL_IMPL_IONOTIFY_REACTOR_HPP diff --git a/asio/include/asio/detail/impl/ionotify_reactor.ipp b/asio/include/asio/detail/impl/ionotify_reactor.ipp new file mode 100644 index 0000000000..79b1b610d6 --- /dev/null +++ b/asio/include/asio/detail/impl/ionotify_reactor.ipp @@ -0,0 +1,521 @@ +// detail/impl/ionotify_reactor.ipp +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2021 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef ASIO_DETAIL_IMPL_IONOTIFY_REACTOR_IPP +#define ASIO_DETAIL_IMPL_IONOTIFY_REACTOR_IPP + +#if defined(_MSC_VER) && (_MSC_VER >= 1200) +#pragma once +#endif // defined(_MSC_VER) && (_MSC_VER >= 1200) + +#include "asio/detail/config.hpp" + +#if defined(ASIO_HAS_IONOTIFY) + +#include +#include +#include + +#include "asio/detail/fd_set_adapter.hpp" +#include "asio/detail/ionotify_reactor.hpp" +#include "asio/detail/scheduler.hpp" +#include "asio/detail/signal_blocker.hpp" +#include "asio/detail/socket_ops.hpp" + +#include "asio/detail/push_options.hpp" + +#define PULSE_PRIO \ + 10 // We want all our pulses to have the same priority, to preserve ordering +#define INTERRUPT_PULSE_CODE _PULSE_CODE_MINAVAIL +#define SERIAL_BIT 1 // Used to detect old pulses + +namespace asio { +namespace detail { + +ionotify_reactor::ionotify_reactor(asio::execution_context& ctx) + : execution_context_service_base(ctx), + scheduler_(use_service(ctx)), mutex_(), chid_(-1), + coid_(-1), interrupted_(false), shutdown_(false) +{ + create_pulse_channel(); +} + +ionotify_reactor::~ionotify_reactor() +{ + shutdown(); + + // Unregister all registered events + for (size_t i = 0; i < fdmap_.size(); i++) + if (fdmap_[i].ioev_.sigev_notify != 0) + MsgUnregisterEvent(&fdmap_[i].ioev_); + + if (chid_ >= 0) + { + ChannelDestroy(chid_); + if (coid_ >= 0) + { + ConnectDetach(coid_); + } + } +} + +void ionotify_reactor::shutdown() +{ + mutex::scoped_lock lock(mutex_); + shutdown_ = true; + lock.unlock(); + + op_queue ops; + + for (int i = 0; i < max_ops; ++i) + op_queue_[i].get_all_operations(ops); + + timer_queues_.get_all_timers(ops); + + scheduler_.abandon_operations(ops); +} + +void ionotify_reactor::notify_fork(asio::execution_context::fork_event fork_ev) +{ + if (fork_ev == asio::execution_context::fork_child) + { + // Cleanup all registered events + for (size_t i = 0; i < fdmap_.size(); i++) + { + fdmap_[i].ioev_.sigev_notify = 0; + fdmap_[i].armed_ = 0; + } + + // Re-create pulse channel for forked process + chid_ = coid_ = -1; + create_pulse_channel(); + } +} + +void ionotify_reactor::init_task() +{ + scheduler_.init_task(); +} + +int ionotify_reactor::register_descriptor( + socket_type /*fd*/, ionotify_reactor::per_descriptor_data& /*d*/) +{ + return 0; +} + +int ionotify_reactor::register_internal_descriptor(int op_type, + socket_type descriptor, ionotify_reactor::per_descriptor_data& /*d*/, + reactor_op* op) +{ + mutex::scoped_lock lock(mutex_); + + op_queue_[op_type].enqueue_operation(descriptor, op); + interrupt_unlocked(); + + return 0; +} + +void ionotify_reactor::move_descriptor(socket_type /*fd*/, + ionotify_reactor::per_descriptor_data& /*d1*/, + ionotify_reactor::per_descriptor_data& /*d2*/) +{ +} + +void ionotify_reactor::call_post_immediate_completion( + operation* op, bool is_continuation, const void* self) +{ + static_cast(self)->post_immediate_completion( + op, is_continuation); +} + +void ionotify_reactor::start_op(int op_type, socket_type descriptor, + ionotify_reactor::per_descriptor_data&, reactor_op* op, + bool is_continuation, bool, + void (*on_immediate)(operation*, bool, const void*), + const void* immediate_arg) +{ + mutex::scoped_lock lock(mutex_); + + if (shutdown_) + { + on_immediate(op, is_continuation, immediate_arg); + return; + } + + bool first = op_queue_[op_type].enqueue_operation(descriptor, op); + scheduler_.work_started(); + if (first) + interrupt_unlocked(); +} + +void ionotify_reactor::cancel_ops( + socket_type descriptor, ionotify_reactor::per_descriptor_data&) +{ + mutex::scoped_lock lock(mutex_); + cancel_ops_unlocked(descriptor, asio::error::operation_aborted); +} + +void ionotify_reactor::cancel_ops_by_key(socket_type descriptor, + ionotify_reactor::per_descriptor_data&, int op_type, void* cancellation_key) +{ + mutex::scoped_lock lock(mutex_); + op_queue ops; + bool need_interrupt = op_queue_[op_type].cancel_operations_by_key( + descriptor, ops, cancellation_key, asio::error::operation_aborted); + scheduler_.post_deferred_completions(ops); + if (need_interrupt) + interrupt_unlocked(); +} + +void ionotify_reactor::deregister_descriptor_unlocked(socket_type descriptor) +{ + // If run() has never seen this fd, we don't need to do anything + if (static_cast(descriptor) >= fdmap_.size()) + return; + + // An fd has been deregistered. If it's armed, we need to invalidate its pulses. + fdmap_[descriptor].armed_ = 0; + fdmap_[descriptor].ops_ = 0; + fdmap_[descriptor].wanted_ = 0; + if (fdmap_[descriptor].ioev_.sigev_notify != 0) + { + if (MsgUnregisterEvent(&fdmap_[descriptor].ioev_) == 0) + fdmap_[descriptor].ioev_.sigev_notify = 0; + } +} + +void ionotify_reactor::deregister_descriptor(socket_type descriptor, + ionotify_reactor::per_descriptor_data& /*d*/, bool /*cancel_ops*/) +{ + mutex::scoped_lock lock(mutex_); + + cancel_ops_unlocked(descriptor, asio::error::operation_aborted); + deregister_descriptor_unlocked(descriptor); +} + +void ionotify_reactor::deregister_internal_descriptor( + socket_type descriptor, ionotify_reactor::per_descriptor_data& /*d*/) +{ + mutex::scoped_lock lock(mutex_); + op_queue ops; + for (int i = 0; i < max_ops; ++i) + op_queue_[i].cancel_operations(descriptor, ops); + deregister_descriptor_unlocked(descriptor); +} + +socket_type ionotify_reactor::fdmap::set( + reactor_op_queue& operations, op_queue& /*ops*/, + op_types op) +{ + socket_type maxfd = invalid_socket; + const op_bits opbit = op_bit(op); + reactor_op_queue::iterator i = operations.begin(); + while (i != operations.end()) + { + reactor_op_queue::iterator op_iter = i++; + socket_type fd = op_iter->first; + if (maxfd == invalid_socket || fd > maxfd) + { + maxfd = fd; + if (static_cast(maxfd) >= size()) + resize(maxfd + 1); + } + (*this)[fd].ops_ |= opbit; + } + return maxfd; +} + +void ionotify_reactor::fdmap::perform(reactor_op_queue& operations, + op_queue& ops, op_types op) +{ + const op_bits opbit = op_bit(op); + reactor_op_queue::iterator i = operations.begin(); + while (i != operations.end()) + { + reactor_op_queue::iterator op_iter = i++; + socket_type fd = op_iter->first; + if (static_cast(fd) >= size()) + continue; + + fdstate& state = (*this)[fd]; + if (state.ops_ & opbit) + { + state.ops_ &= ~opbit; + operations.perform_operations(op_iter, ops); + } + } +} + +void ionotify_reactor::cleanup_descriptor_data( + ionotify_reactor::per_descriptor_data&) +{ +} + +void ionotify_reactor::run(long usec, op_queue& ops) +{ + mutex::scoped_lock lock(mutex_); + + // In each element of fdmap_: + // * ops_ is zero except when we're in this function + // * armed_ keep track of the fd's state between select() calls + // * wanted_ is used by select() but not meaningful between calls + // * ioev is used by select() to get SI_NOTIFY pulse when data from fd is ready + + socket_type max_fd = invalid_socket; + bool have_work_to_do = !timer_queues_.all_empty(); + for (int i = 0; i < max_select_ops; ++i) + { + have_work_to_do = have_work_to_do || !op_queue_[i].empty(); + socket_type maxfd = fdmap_.set(op_queue_[i], ops, static_cast(i)); + if (max_fd == invalid_socket || maxfd > max_fd) + max_fd = maxfd; + } + + // We can return immediately if there's no work to do and the reactor is + // not supposed to block. + if (!usec && !have_work_to_do) + return; + + // Determine how long to block while waiting for events. + long timeout_usec = 0; + if (usec && !interrupted_) + timeout_usec = get_timeout_usec(usec); + + // Block on the select call until descriptors become ready. + int retval = select(lock, max_fd, timeout_usec); + + lock.lock(); + interrupted_ = false; + + // Dispatch all ready operations. + if (retval > 0) + { + + // Exception operations must be processed first to ensure that any + // out-of-band data is read before normal data. + for (int i = max_select_ops - 1; i >= 0; --i) + fdmap_.perform(op_queue_[i], ops, static_cast(i)); + } + timer_queues_.get_ready_timers(ops); +} + +void ionotify_reactor::interrupt_unlocked() +{ + if (!interrupted_) + { + MsgSendPulse(coid_, PULSE_PRIO, INTERRUPT_PULSE_CODE, 0); + interrupted_ = true; + } +} + +void ionotify_reactor::interrupt() +{ + mutex::scoped_lock lock(mutex_); + interrupt_unlocked(); +} + +int ionotify_reactor::select( + mutex::scoped_lock& lock, socket_type max_fd, long timeout_usec) +{ + static const int opmap[max_select_ops] = { + _NOTIFY_COND_INPUT, + _NOTIFY_COND_OUTPUT, + _NOTIFY_COND_OBAND, + }; + int nready = 0; // How many fds are ready + int narmed = 0; // How many pulses we've armed + int nalready = 0; // How many pulses were already armed + + if (max_fd != invalid_socket) + { + for (socket_type fd = 0; fd <= max_fd; ++fd) + { + fdstate& state = fdmap_[fd]; + if (state.ops_ == 0) + state.wanted_ = false; + else + { + state.wanted_ = true; + // Do we already have an armed pulse that matches what we want? + // (Note that armed for *more* than we want would require special handling if it arrives with just ops that we don't want) + if (state.ops_ == state.armed_) + { + ++nalready; + state.ops_ = 0; // We'll set them when we get a pulse + } + else + { + // A pulse is either not armed or not a match. + // We'll need to call ionotify and ignore any old pulse. + + int io_ops = 0; + for (int i = 0; i < max_select_ops; ++i) + { + op_bits opbit = op_bit(static_cast(i)); + if (state.ops_ & opbit) + io_ops |= opmap[i]; + } + + sigevent* pev = &state.ioev_; + if (pev->sigev_notify == 0) + { + SIGEV_PULSE_INIT(pev, coid_, PULSE_PRIO, SI_NOTIFY, fd << 1); + SIGEV_MAKE_UPDATEABLE(pev); + if (MsgRegisterEvent(pev, fd) < 0) + { + asio::error_code ec(errno, asio::error::get_system_category()); + asio::detail::throw_error(ec, "MsgRegisterEvent failed:"); + } + } + pev->sigev_value.sival_int ^= SERIAL_BIT; + int rc = ::ionotify(fd, _NOTIFY_ACTION_POLLARM, io_ops, pev); + if (rc != 0) + { + if (rc == -1) + { + asio::error_code ec(errno, asio::error::get_system_category()); + asio::detail::throw_error(ec, "ionotify failed:"); + } + // Unset any bits that aren't ready + for (int i = 0; i < max_select_ops; ++i) + if (!(rc & opmap[i])) + state.ops_ &= ~op_bit(static_cast(i)); + ++nready; + state.armed_ = 0; + } + else + { + ++narmed; + state.armed_ = state.ops_; + state.ops_ = 0; // We'll set them when we get a pulse + } + } + } + } + } + + lock.unlock(); + + // Now collect pulses. If appropriate, wait for them. + + uint64_t timeout_nsec, *ntime; + if (nready == 0 && timeout_usec != 0) + ntime = &(timeout_nsec = timeout_usec * 1000uLL); + else + ntime = nullptr; + + _pulse pulse; + int rc; + + TimerTimeout(CLOCK_MONOTONIC, _NTO_TIMEOUT_RECEIVE, 0, ntime, 0); + while ((rc = MsgReceivePulse_r(chid_, &pulse, sizeof(pulse), NULL)) == EOK) + { + switch (pulse.code) + { + case INTERRUPT_PULSE_CODE: + if (ntime) + ntime = nullptr; + break; + case SI_NOTIFY: + { + socket_type fd = (pulse.value.sival_int & _NOTIFY_DATA_MASK) >> 1; + mutex::scoped_lock lk(mutex_); + if (fdmap_.is_valid(fd)) + { + fdstate& state = fdmap_[fd]; + state.armed_ = 0; + if ((pulse.value.sival_int & _NOTIFY_DATA_MASK) == + static_cast(state.ioev_.sigev_value.sival_int)) + { + if (state.wanted_) + { + for (int i = 0; i < max_select_ops; ++i) + if (pulse.value.sival_int & opmap[i]) + { + state.ops_ |= op_bit(static_cast(i)); + ntime = nullptr; // We have a ready fd. No more waiting. + } + ++nready; + } + } + } + break; + } + default: + break; + } + TimerTimeout(CLOCK_MONOTONIC, _NTO_TIMEOUT_RECEIVE, 0, ntime, 0); + } + + return nready; +} + +void ionotify_reactor::do_add_timer_queue(timer_queue_base& queue) +{ + mutex::scoped_lock lock(mutex_); + timer_queues_.insert(&queue); +} + +void ionotify_reactor::do_remove_timer_queue(timer_queue_base& queue) +{ + mutex::scoped_lock lock(mutex_); + timer_queues_.erase(&queue); +} + +long ionotify_reactor::get_timeout_usec(long usec) +{ + // By default we will wait no longer than 5 minutes. This will ensure that + // any changes to the system clock are detected after no longer than this. + const long max_usec = 5 * 60 * 1000 * 1000; + return timer_queues_.wait_duration_usec( + (usec < 0 || max_usec < usec) ? max_usec : usec); +} + +void ionotify_reactor::cancel_ops_unlocked( + socket_type descriptor, const asio::error_code& ec) +{ + bool need_interrupt = false; + op_queue ops; + for (int i = 0; i < max_ops; ++i) + need_interrupt = + op_queue_[i].cancel_operations(descriptor, ops, ec) || need_interrupt; + scheduler_.post_deferred_completions(ops); + if (need_interrupt) + interrupt_unlocked(); +} + +void ionotify_reactor::create_pulse_channel() +{ + const unsigned channelflags = _NTO_CHF_FIXED_PRIORITY | _NTO_CHF_PRIVATE; + const unsigned connectflags = _NTO_COF_REG_EVENTS; + chid_ = ChannelCreate_r(channelflags); + if (chid_ < 0) + { + asio::error_code ec(-chid_, asio::error::get_system_category()); + asio::detail::throw_error(ec, "ChannelCreate_r failed:"); + } + else + { + coid_ = ConnectAttach(0, 0, chid_, _NTO_SIDE_CHANNEL, connectflags); + if (coid_ < 0) + { + asio::error_code ec(errno, asio::error::get_system_category()); + asio::detail::throw_error(ec, "ConnectAttach failed:"); + } + } +} + +} // namespace detail +} // namespace asio + +#include "asio/detail/pop_options.hpp" + +#endif // defined(ASIO_HAS_IONOTIFY) + +#endif // ASIO_DETAIL_IMPL_IONOTIFY_REACTOR_IPP diff --git a/asio/include/asio/detail/ionotify_reactor.hpp b/asio/include/asio/detail/ionotify_reactor.hpp new file mode 100644 index 0000000000..c5bf9f7523 --- /dev/null +++ b/asio/include/asio/detail/ionotify_reactor.hpp @@ -0,0 +1,291 @@ +// +// detail/ionotify_reactor.hpp +// ~~~~~~~~~~~~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2021 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef ASIO_DETAIL_IONOTIFY_REACTOR_HPP +#define ASIO_DETAIL_IONOTIFY_REACTOR_HPP + +#if defined(_MSC_VER) && (_MSC_VER >= 1200) +#pragma once +#endif // defined(_MSC_VER) && (_MSC_VER >= 1200) + +#include "asio/detail/config.hpp" + +#if defined(ASIO_HAS_IONOTIFY) + +#include "asio/detail/scheduler_task.hpp" +#include "asio/detail/fd_set_adapter.hpp" +#include "asio/detail/limits.hpp" +#include "asio/detail/mutex.hpp" +#include "asio/detail/op_queue.hpp" +#include "asio/detail/reactor_op.hpp" +#include "asio/detail/reactor_op_queue.hpp" +#include "asio/detail/socket_types.hpp" +#include "asio/detail/timer_queue_base.hpp" +#include "asio/detail/timer_queue_set.hpp" +#include "asio/detail/wait_op.hpp" +#include "asio/execution_context.hpp" +#include "cstddef" + +#include "asio/detail/push_options.hpp" + +namespace asio { +namespace detail { + +class ionotify_reactor + : public execution_context_service_base, + public scheduler_task +{ +public: + enum op_types + { + read_op = 0, + write_op = 1, + except_op = 2, + max_select_ops = 3, + connect_op = 1, + max_ops = 3 + }; + + // Per-descriptor data. + struct per_descriptor_data + { + }; + + // Constructor. + ASIO_DECL ionotify_reactor(asio::execution_context& ctx); + + // Destructor. + ASIO_DECL ~ionotify_reactor(); + + // Destroy all user-defined handler objects owned by the service. + ASIO_DECL void shutdown(); + + // Recreate internal descriptors following a fork. + ASIO_DECL void notify_fork(asio::execution_context::fork_event fork_ev); + + // Initialise the task, but only if the reactor is not in its own thread. + ASIO_DECL void init_task(); + + // Register a socket with the reactor. Returns 0 on success, system error + // code on failure. + ASIO_DECL int register_descriptor(socket_type, per_descriptor_data&); + + // Register a descriptor with an associated single operation. Returns 0 on + // success, system error code on failure. + ASIO_DECL int register_internal_descriptor(int op_type, + socket_type descriptor, per_descriptor_data& descriptor_data, + reactor_op* op); + + // Post a reactor operation for immediate completion. + void post_immediate_completion(operation* op, bool is_continuation) const; + + // Post a reactor operation for immediate completion. + ASIO_DECL static void call_post_immediate_completion( + operation* op, bool is_continuation, const void* self); + + // Start a new operation. The reactor operation will be performed when the + // given descriptor is flagged as ready, or an error has occurred. + ASIO_DECL void start_op(int op_type, socket_type descriptor, + per_descriptor_data&, reactor_op* op, bool is_continuation, + bool allow_speculative, + void (*on_immediate)(operation*, bool, const void*), + const void* immediate_arg); + + // Start a new operation. The reactor operation will be performed when the + // given descriptor is flagged as ready, or an error has occurred. + void start_op(int op_type, socket_type descriptor, + per_descriptor_data& descriptor_data, reactor_op* op, + bool is_continuation, bool allow_speculative) + { + start_op(op_type, descriptor, descriptor_data, op, is_continuation, + allow_speculative, &ionotify_reactor::call_post_immediate_completion, + this); + } + + // Cancel all operations associated with the given descriptor. The + // handlers associated with the descriptor will be invoked with the + // operation_aborted error. + ASIO_DECL void cancel_ops(socket_type descriptor, per_descriptor_data&); + + // Cancel all operations associated with the given descriptor and key. The + // handlers associated with the descriptor will be invoked with the + // operation_aborted error. + ASIO_DECL void cancel_ops_by_key(socket_type descriptor, + per_descriptor_data& descriptor_data, int op_type, + void* cancellation_key); + + // Cancel any operations that are running against the descriptor and remove + // its registration from the reactor. The reactor resources associated with + // the descriptor must be released by calling cleanup_descriptor_data. + ASIO_DECL void deregister_descriptor( + socket_type descriptor, per_descriptor_data&, bool closing); + + // Remove the descriptor's registration from the reactor. The reactor + // resources associated with the descriptor must be released by calling + // cleanup_descriptor_data. + ASIO_DECL void deregister_internal_descriptor( + socket_type descriptor, per_descriptor_data&); + + // Perform any post-deregistration cleanup tasks associated with the + // descriptor data. + ASIO_DECL void cleanup_descriptor_data(per_descriptor_data&); + + // Move descriptor registration from one descriptor_data object to another. + ASIO_DECL void move_descriptor(socket_type descriptor, + per_descriptor_data& target_descriptor_data, + per_descriptor_data& source_descriptor_data); + + // Add a new timer queue to the reactor. + template + void add_timer_queue(timer_queue& queue); + + // Remove a timer queue from the reactor. + template + void remove_timer_queue(timer_queue& queue); + + // Schedule a new operation in the given timer queue to expire at the + // specified absolute time. + template + void schedule_timer(timer_queue& queue, + const typename Time_Traits::time_type& time, + typename timer_queue::per_timer_data& timer, wait_op* op); + + // Cancel the timer operations associated with the given token. Returns the + // number of operations that have been posted or dispatched. + template + std::size_t cancel_timer(timer_queue& queue, + typename timer_queue::per_timer_data& timer, + std::size_t max_cancelled = (std::numeric_limits::max)()); + + // Cancel the timer operations associated with the given key. + template + void cancel_timer_by_key(timer_queue& queue, + typename timer_queue::per_timer_data* timer, + void* cancellation_key); + + // Move the timer operations associated with the given timer. + template + void move_timer(timer_queue& queue, + typename timer_queue::per_timer_data& target, + typename timer_queue::per_timer_data& source); + + // Run select once until interrupted or events are ready to be dispatched. + ASIO_DECL void run(long usec, op_queue& ops); + + // Interrupt the select loop. + ASIO_DECL void interrupt(); + +private: + // Helper function that acts like socket_ops::select() with our fd_sets, but internally uses pulses. + ASIO_DECL int select(asio::detail::mutex::scoped_lock& lock, + socket_type max_fd, long timeout_usec); + + // Helper function to add a new timer queue. + ASIO_DECL void do_add_timer_queue(timer_queue_base& queue); + + // Helper function to remove a timer queue. + ASIO_DECL void do_remove_timer_queue(timer_queue_base& queue); + + // Get the timeout value for the select call. + ASIO_DECL long get_timeout_usec(long usec); + + // Cancel all operations associated with the given descriptor. This function + // does not acquire the ionotify_reactor's mutex. + ASIO_DECL void cancel_ops_unlocked( + socket_type descriptor, const asio::error_code& ec); + + // Deregister the given descriptor. This function + // does not acquire the ionotify_reactor's mutex. + ASIO_DECL void deregister_descriptor_unlocked(socket_type descriptor); + + // Internal version of interrupt(), called with the mutex already locked + ASIO_DECL void interrupt_unlocked(); + + // Create pulse channel for select() + ASIO_DECL void create_pulse_channel(void); + + // The scheduler implementation used to post completions. + typedef class scheduler scheduler_type; + scheduler_type& scheduler_; + + // Mutex to protect access to internal data. + asio::detail::mutex mutex_; + + // The queues of read, write and except operations. + reactor_op_queue op_queue_[max_ops]; + + // Structure that holds the state of an fd and what we want from it. + // (We'll have a vector of these.) + struct fdstate + { + unsigned ops_ : max_select_ops; + bool wanted_ : 1; + unsigned armed_ : max_select_ops; + sigevent ioev_; + + enum op_bits + { + NONE = 0, + READ = 1 << read_op, + WRITE = 1 << write_op, + EXCEPT = 1 << except_op, + ALL = READ | WRITE | EXCEPT + }; + fdstate() : ops_(NONE), wanted_(false), armed_(NONE) + { + ioev_.sigev_notify = 0; + } + }; + using op_bits = fdstate::op_bits; + static op_bits op_bit(op_types op) + { + return static_cast(1 << op); + } + + class fdmap : public std::vector + { + public: + // Similar to posix_fd_set_adapter::set but returns the max fd (or invalid_socket) + ASIO_DECL socket_type set(reactor_op_queue& operations, + op_queue& ops, op_types op); + // Similar to posix_fd_set_adapter::perform but returns the max fd (or invalid_socket) + ASIO_DECL void perform(reactor_op_queue& operations, + op_queue& ops, op_types op); + bool is_valid(socket_type fd) + { + return fd >= 0 && static_cast(fd) < size(); + } + } fdmap_; + + // The timer queues. + timer_queue_set timer_queues_; + + // The channel and coid used for pulses + int chid_, coid_; + // Whether we have already sent an interrupt pulse + bool interrupted_; + + // Whether the service has been shut down. + bool shutdown_; +}; + +} // namespace detail +} // namespace asio + +#include "asio/detail/pop_options.hpp" + +#include "asio/detail/impl/ionotify_reactor.hpp" +#if defined(ASIO_HEADER_ONLY) +#include "asio/detail/impl/ionotify_reactor.ipp" +#endif // defined(ASIO_HEADER_ONLY) + +#endif // defined(ASIO_HAS_IONOTIFY) + +#endif // ASIO_DETAIL_IONOTIFY_REACTOR_HPP diff --git a/asio/include/asio/detail/reactor.hpp b/asio/include/asio/detail/reactor.hpp index adf33536b1..8a1f4ab4d5 100644 --- a/asio/include/asio/detail/reactor.hpp +++ b/asio/include/asio/detail/reactor.hpp @@ -27,6 +27,8 @@ # include "asio/detail/kqueue_reactor.hpp" #elif defined(ASIO_HAS_DEV_POLL) # include "asio/detail/dev_poll_reactor.hpp" +#elif defined(ASIO_HAS_IONOTIFY) +# include "asio/detail/ionotify_reactor.hpp" #else # include "asio/detail/select_reactor.hpp" #endif @@ -44,6 +46,8 @@ typedef epoll_reactor reactor; typedef kqueue_reactor reactor; #elif defined(ASIO_HAS_DEV_POLL) typedef dev_poll_reactor reactor; +#elif defined(ASIO_HAS_IONOTIFY) +typedef ionotify_reactor reactor; #else typedef select_reactor reactor; #endif diff --git a/asio/include/asio/detail/select_reactor.hpp b/asio/include/asio/detail/select_reactor.hpp index 747abb54d0..952553e7b8 100644 --- a/asio/include/asio/detail/select_reactor.hpp +++ b/asio/include/asio/detail/select_reactor.hpp @@ -19,6 +19,7 @@ #if defined(ASIO_HAS_IOCP) \ || (!defined(ASIO_HAS_DEV_POLL) \ + && !defined(ASIO_HAS_IONOTIFY) \ && !defined(ASIO_HAS_EPOLL) \ && !defined(ASIO_HAS_KQUEUE) \ && !defined(ASIO_WINDOWS_RUNTIME)) @@ -284,6 +285,7 @@ class select_reactor #endif // defined(ASIO_HAS_IOCP) // || (!defined(ASIO_HAS_DEV_POLL) + // && !defined(ASIO_HAS_IONOTIFY) // && !defined(ASIO_HAS_EPOLL) // && !defined(ASIO_HAS_KQUEUE) // && !defined(ASIO_WINDOWS_RUNTIME)) diff --git a/asio/include/asio/detail/timer_scheduler.hpp b/asio/include/asio/detail/timer_scheduler.hpp index 1c2288a9ad..349b7c929b 100644 --- a/asio/include/asio/detail/timer_scheduler.hpp +++ b/asio/include/asio/detail/timer_scheduler.hpp @@ -30,6 +30,8 @@ # include "asio/detail/kqueue_reactor.hpp" #elif defined(ASIO_HAS_DEV_POLL) # include "asio/detail/dev_poll_reactor.hpp" +#elif defined(ASIO_HAS_IONOTIFY) +# include "asio/detail/ionotify_reactor.hpp" #else # include "asio/detail/select_reactor.hpp" #endif diff --git a/asio/include/asio/detail/timer_scheduler_fwd.hpp b/asio/include/asio/detail/timer_scheduler_fwd.hpp index e0d7480110..c2ccddd70a 100644 --- a/asio/include/asio/detail/timer_scheduler_fwd.hpp +++ b/asio/include/asio/detail/timer_scheduler_fwd.hpp @@ -32,6 +32,8 @@ typedef class epoll_reactor timer_scheduler; typedef class kqueue_reactor timer_scheduler; #elif defined(ASIO_HAS_DEV_POLL) typedef class dev_poll_reactor timer_scheduler; +#elif defined(ASIO_HAS_IONOTIFY) +typedef class ionotify_reactor timer_scheduler; #else typedef class select_reactor timer_scheduler; #endif