Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 23 additions & 43 deletions include/boost/redis/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <boost/redis/detail/reader_fsm.hpp>
#include <boost/redis/detail/redis_stream.hpp>
#include <boost/redis/detail/resp3_handshaker.hpp>
#include <boost/redis/detail/writer_fsm.hpp>
#include <boost/redis/error.hpp>
#include <boost/redis/logger.hpp>
#include <boost/redis/operation.hpp>
Expand Down Expand Up @@ -209,52 +210,31 @@ struct connection_impl {
template <class Executor>
struct writer_op {
connection_impl<Executor>* conn_;
asio::coroutine coro{};
writer_fsm fsm_;

explicit writer_op(connection_impl<Executor>& conn) noexcept
: conn_(&conn)
, fsm_(conn.mpx_, conn.logger_)
{ }

template <class Self>
void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0)
void operator()(Self& self, system::error_code ec = {}, std::size_t = {})
{
ignore_unused(n);

BOOST_ASIO_CORO_REENTER(coro) for (;;)
{
while (conn_->mpx_.prepare_write() != 0) {
BOOST_ASIO_CORO_YIELD
asio::async_write(
conn_->stream_,
asio::buffer(conn_->mpx_.get_write_buffer()),
std::move(self));

conn_->logger_.on_write(ec, conn_->mpx_.get_write_buffer().size());

if (ec) {
conn_->logger_.trace("writer_op (1)", ec);
conn_->cancel(operation::run);
self.complete(ec);
return;
}

conn_->mpx_.commit_write();

// A socket.close() may have been called while a
// successful write might had already been queued, so we
// have to check here before proceeding.
if (!conn_->is_open()) {
conn_->logger_.trace("writer_op (2): connection is closed.");
self.complete({});
for (;;) {
auto act = fsm_.resume(ec, self.get_cancellation_state().cancelled());

switch (act.type()) {
case writer_action_type::done: self.complete(act.error()); return;
case writer_action_type::write:
asio::async_write(
conn_->stream_,
asio::buffer(conn_->mpx_.get_write_buffer()),
std::move(self));
return;
}
}

BOOST_ASIO_CORO_YIELD
conn_->writer_timer_.async_wait(std::move(self));
if (!conn_->is_open()) {
conn_->logger_.trace("writer_op (3): connection is closed.");
// Notice this is not an error of the op, stoping was
// requested from the outside, so we complete with
// success.
self.complete({});
return;
case writer_action_type::wait: conn_->writer_timer_.async_wait(std::move(self)); return;
case writer_action_type::cancel_run:
conn_->cancel(operation::run);
continue; // This op doesn't need yielding
}
}
}
Expand Down Expand Up @@ -339,7 +319,7 @@ class run_op {
auto writer(CompletionToken&& token)
{
return asio::async_compose<CompletionToken, void(system::error_code)>(
writer_op<Executor>{conn_},
writer_op<Executor>{*conn_},
std::forward<CompletionToken>(token),
conn_->writer_timer_);
}
Expand Down
10 changes: 9 additions & 1 deletion include/boost/redis/detail/connection_logger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class connection_logger {
void on_connect(system::error_code const& ec, asio::ip::tcp::endpoint const& ep);
void on_connect(system::error_code const& ec, std::string_view unix_socket_ep);
void on_ssl_handshake(system::error_code const& ec);
void on_write(system::error_code const& ec, std::size_t n);
void on_fsm_resume(reader_fsm::action const& action);
void on_hello(system::error_code const& ec, generic_response const& resp);
void log(logger::level lvl, std::string_view msg);
Expand All @@ -47,6 +46,15 @@ class connection_logger {
{
log(logger::level::debug, op, ec);
}

template <class Fn>
void log_fn(logger::level lvl, Fn fn)
{
if (logger_.lvl < lvl)
return;
fn(msg_);
logger_.fn(lvl, msg_);
}
};

} // namespace boost::redis::detail
Expand Down
67 changes: 67 additions & 0 deletions include/boost/redis/detail/writer_fsm.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//
// Copyright (c) 2025 Marcelo Zimbres Silva ([email protected]),
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#ifndef BOOST_REDIS_WRITER_FSM_HPP
#define BOOST_REDIS_WRITER_FSM_HPP

#include <boost/asio/cancellation_type.hpp>
#include <boost/system/error_code.hpp>

// Sans-io algorithm for the writer task, as a finite state machine

namespace boost::redis::detail {

// Forward decls
class connection_logger;
class multiplexer;

// What should we do next?
enum class writer_action_type
{
done, // Call the final handler
write, // Issue a write on the stream
wait, // Wait until there is data to be written
cancel_run, // Cancel the connection's run operation
};

class writer_action {
writer_action_type type_;
system::error_code ec_;

public:
writer_action(writer_action_type type) noexcept
: type_{type}
{ }

writer_action(system::error_code ec) noexcept
: type_{writer_action_type::done}
, ec_{ec}
{ }

writer_action_type type() const { return type_; }
system::error_code error() const { return ec_; }
};

class writer_fsm {
int resume_point_{0};
multiplexer* mpx_;
connection_logger* logger_;
system::error_code stored_ec_;

public:
writer_fsm(multiplexer& mpx, connection_logger& logger) noexcept
: mpx_(&mpx)
, logger_(&logger)
{ }

writer_action resume(system::error_code ec, asio::cancellation_type_t cancel_state);
};

} // namespace boost::redis::detail

#endif // BOOST_REDIS_CONNECTOR_HPP
17 changes: 1 addition & 16 deletions include/boost/redis/impl/connection_logger.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

#include <boost/redis/detail/connection_logger.hpp>
#include <boost/redis/detail/exec_fsm.hpp>
#include <boost/redis/logger.hpp>

#include <boost/asio/ip/tcp.hpp>
Expand Down Expand Up @@ -141,22 +142,6 @@ void connection_logger::on_ssl_handshake(system::error_code const& ec)
logger_.fn(logger::level::info, msg_);
}

void connection_logger::on_write(system::error_code const& ec, std::size_t n)
{
if (logger_.lvl < logger::level::info)
return;

msg_ = "writer_op: ";
if (ec) {
format_error_code(ec, msg_);
} else {
msg_ += std::to_string(n);
msg_ += " bytes written.";
}

logger_.fn(logger::level::info, msg_);
}

void connection_logger::on_fsm_resume(reader_fsm::action const& action)
{
if (logger_.lvl < logger::level::debug)
Expand Down
8 changes: 1 addition & 7 deletions include/boost/redis/impl/exec_fsm.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,14 @@

#include <boost/redis/detail/coroutine.hpp>
#include <boost/redis/detail/exec_fsm.hpp>
#include <boost/redis/impl/is_cancellation.hpp>
#include <boost/redis/request.hpp>

#include <boost/asio/error.hpp>
#include <boost/assert.hpp>

namespace boost::redis::detail {

inline bool is_cancellation(asio::cancellation_type_t type)
{
return !!(
type & (asio::cancellation_type_t::total | asio::cancellation_type_t::partial |
asio::cancellation_type_t::terminal));
}

exec_action exec_fsm::resume(bool connection_is_open, asio::cancellation_type_t cancel_state)
{
switch (resume_point_) {
Expand Down
25 changes: 25 additions & 0 deletions include/boost/redis/impl/is_cancellation.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
//
// Copyright (c) 2025 Marcelo Zimbres Silva ([email protected]),
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#ifndef BOOST_REDIS_IS_CANCELLATION_HPP
#define BOOST_REDIS_IS_CANCELLATION_HPP

#include <boost/asio/cancellation_type.hpp>

namespace boost::redis::detail {

inline bool is_cancellation(asio::cancellation_type_t type)
{
return !!(
type & (asio::cancellation_type_t::total | asio::cancellation_type_t::partial |
asio::cancellation_type_t::terminal));
}

} // namespace boost::redis::detail

#endif
84 changes: 84 additions & 0 deletions include/boost/redis/impl/writer_fsm.ipp
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
//
// Copyright (c) 2025 Marcelo Zimbres Silva ([email protected]),
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#ifndef BOOST_REDIS_WRITER_FSM_IPP
#define BOOST_REDIS_WRITER_FSM_IPP

#include <boost/redis/detail/connection_logger.hpp>
#include <boost/redis/detail/coroutine.hpp>
#include <boost/redis/detail/multiplexer.hpp>
#include <boost/redis/detail/writer_fsm.hpp>
#include <boost/redis/impl/is_cancellation.hpp>

#include <boost/asio/cancellation_type.hpp>
#include <boost/asio/error.hpp>
#include <boost/assert.hpp>
#include <boost/system/error_code.hpp>

namespace boost::redis::detail {

inline void log_write_success(connection_logger& logger, std::size_t bytes_written)
{
logger.log_fn(logger::level::info, [bytes_written](std::string& buff) {
buff = "Writer task: ";
buff += std::to_string(bytes_written);
buff += " bytes written.";
});
}

writer_action writer_fsm::resume(system::error_code ec, asio::cancellation_type_t cancel_state)
{
switch (resume_point_) {
BOOST_REDIS_CORO_INITIAL

for (;;) {
// Attempt to write while we have requests ready to send
while (mpx_->prepare_write() != 0u) {
// Write
BOOST_REDIS_YIELD(resume_point_, 1, writer_action_type::write)

// A failed write means that we should tear down the connection
if (ec) {
logger_->log(logger::level::err, "Writer task error: ", ec);
stored_ec_ = ec;
BOOST_REDIS_YIELD(resume_point_, 2, writer_action_type::cancel_run)
return stored_ec_;
}

// Log what we wrote
log_write_success(*logger_, mpx_->get_write_buffer().size());

// Mark requests as written
mpx_->commit_write();

// Check for cancellations
if (is_cancellation(cancel_state)) {
logger_->trace("Writer task cancelled (1).");
return system::error_code(asio::error::operation_aborted);
}
}

// No more requests ready to be written. Wait for more
BOOST_REDIS_YIELD(resume_point_, 3, writer_action_type::wait)

// Check for cancellations
if (is_cancellation(cancel_state)) {
logger_->trace("Writer task cancelled (2).");
return system::error_code(asio::error::operation_aborted);
}
}
}

// We should never reach here
BOOST_ASSERT(false);
return system::error_code();
}

} // namespace boost::redis::detail

#endif
1 change: 1 addition & 0 deletions include/boost/redis/src.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <boost/redis/impl/multiplexer.ipp>
#include <boost/redis/impl/read_buffer.ipp>
#include <boost/redis/impl/reader_fsm.ipp>
#include <boost/redis/impl/writer_fsm.ipp>
#include <boost/redis/impl/request.ipp>
#include <boost/redis/impl/resp3_handshaker.ipp>
#include <boost/redis/impl/response.ipp>
Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ make_test(test_exec_fsm)
make_test(test_log_to_file)
make_test(test_conn_logging)
make_test(test_reader_fsm)
make_test(test_connection_logger)

# Tests that require a real Redis server
make_test(test_conn_quit)
Expand Down
1 change: 1 addition & 0 deletions test/Jamfile
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ local tests =
test_log_to_file
test_conn_logging
test_reader_fsm
test_connection_logger
;

# Build and run the tests
Expand Down
2 changes: 1 addition & 1 deletion test/test_conn_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ BOOST_AUTO_TEST_CASE(correct_database)
auto conn = std::make_shared<connection>(ioc);

request req;
req.push("CLIENT", "LIST");
req.push("CLIENT", "INFO");

generic_response resp;

Expand Down
Loading
Loading