From 97d71d1d6b5fded834486482d68bda8776dc57f7 Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Fri, 4 Jul 2025 22:19:19 +0200 Subject: [PATCH 1/2] Removes async_append_some. --- include/boost/redis/config.hpp | 10 ++- include/boost/redis/connection.hpp | 80 +++----------------- include/boost/redis/detail/multiplexer.hpp | 35 ++++----- include/boost/redis/detail/read_buffer.hpp | 57 ++++++++++++++ include/boost/redis/detail/reader_fsm.hpp | 18 +++++ include/boost/redis/error.hpp | 3 + include/boost/redis/impl/error.ipp | 2 + include/boost/redis/impl/multiplexer.ipp | 67 +++++++++-------- include/boost/redis/impl/read_buffer.ipp | 87 ++++++++++++++++++++++ include/boost/redis/impl/reader_fsm.ipp | 30 ++++++-- include/boost/redis/resp3/impl/parser.ipp | 11 --- include/boost/redis/resp3/parser.hpp | 2 - include/boost/redis/src.hpp | 1 + test/common.cpp | 8 +- test/common.hpp | 3 + test/test_exec_fsm.cpp | 16 ++-- test/test_low_level.cpp | 1 + test/test_low_level_sync_sans_io.cpp | 86 +++++++++++++++++---- test/test_reader_fsm.cpp | 73 +++++++++++++----- 19 files changed, 405 insertions(+), 185 deletions(-) create mode 100644 include/boost/redis/detail/read_buffer.hpp create mode 100644 include/boost/redis/impl/read_buffer.ipp diff --git a/include/boost/redis/config.hpp b/include/boost/redis/config.hpp index ed8b419f..06c81002 100644 --- a/include/boost/redis/config.hpp +++ b/include/boost/redis/config.hpp @@ -88,12 +88,20 @@ struct config { */ std::chrono::steady_clock::duration reconnect_wait_interval = std::chrono::seconds{1}; - /** @brief Maximum size of a socket read, in bytes. + /** @brief Maximum size of the read-buffer in bytes. * * Sets a limit on how much data is allowed to be read into the * read buffer. It can be used to prevent DDOS. */ std::size_t max_read_size = (std::numeric_limits::max)(); + + /** @brief read_buffer_append_size + * + * The size by which the read buffer grows when more space is + * needed. There is no need to set this too high because memory is + * reused and the growth will tend to zero. + */ + std::size_t read_buffer_append_size = 4096; }; } // namespace boost::redis diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index 17c151a5..ac078665 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -57,56 +57,6 @@ namespace boost::redis { namespace detail { -template -class append_some_op { -private: - AsyncReadStream& stream_; - DynamicBuffer buf_; - std::size_t size_ = 0; - std::size_t tmp_ = 0; - asio::coroutine coro_{}; - -public: - append_some_op(AsyncReadStream& stream, DynamicBuffer buf, std::size_t size) - : stream_{stream} - , buf_{std::move(buf)} - , size_{size} - { } - - template - void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0) - { - BOOST_ASIO_CORO_REENTER(coro_) - { - tmp_ = buf_.size(); - buf_.grow(size_); - - BOOST_ASIO_CORO_YIELD - stream_.async_read_some(buf_.data(tmp_, size_), std::move(self)); - if (ec) { - self.complete(ec, 0); - return; - } - - buf_.shrink(buf_.size() - tmp_ - n); - self.complete({}, n); - } - } -}; - -template -auto async_append_some( - AsyncReadStream& stream, - DynamicBuffer buffer, - std::size_t size, - CompletionToken&& token) -{ - return asio::async_compose( - append_some_op{stream, buffer, size}, - token, - stream); -} - template using exec_notifier_type = asio::experimental::channel< Executor, @@ -209,33 +159,18 @@ struct writer_op { template struct reader_op { - using dyn_buffer_type = asio::dynamic_string_buffer< - char, - std::char_traits, - std::allocator>; - - // TODO: Move this to config so the user can fine tune? - static constexpr std::size_t buffer_growth_hint = 4096; - Conn* conn_; - detail::reader_fsm fsm_; public: reader_op(Conn& conn) noexcept : conn_{&conn} - , fsm_{conn.mpx_} { } template void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0) { - using dyn_buffer_type = asio::dynamic_string_buffer< - char, - std::char_traits, - std::allocator>; - for (;;) { - auto act = fsm_.resume(n, ec, self.get_cancellation_state().cancelled()); + auto act = conn_->read_fsm_.resume(n, ec, self.get_cancellation_state().cancelled()); conn_->logger_.on_fsm_resume(act); @@ -245,11 +180,10 @@ struct reader_op { continue; case reader_fsm::action::type::needs_more: case reader_fsm::action::type::append_some: - async_append_some( - conn_->stream_, - dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size}, - conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint), - std::move(self)); + { + auto const buf = conn_->read_fsm_.get_append_buffer(); + conn_->stream_.async_read_some(asio::buffer(buf), std::move(self)); + } return; case reader_fsm::action::type::notify_push_receiver: if (conn_->receive_channel_.try_send(ec, act.push_size_)) { @@ -343,6 +277,7 @@ class run_op { // If we were successful, run all the connection tasks if (!ec) { conn_->mpx_.reset(); + conn_->read_fsm_.reset(); // Note: Order is important here because the writer might // trigger an async_write before the async_hello thereby @@ -450,6 +385,7 @@ class basic_connection { , reconnect_timer_{ex} , receive_channel_{ex, 256} , health_checker_{ex} + , read_fsm_{mpx_} , logger_{std::move(lgr)} { set_receive_response(ignore); @@ -553,6 +489,7 @@ class basic_connection { cfg_ = cfg; health_checker_.set_config(cfg); handshaker_.set_config(cfg); + read_fsm_.set_config({cfg_.read_buffer_append_size, cfg_.max_read_size}); return asio::async_compose( detail::run_op{this}, @@ -951,6 +888,7 @@ class basic_connection { config cfg_; detail::multiplexer mpx_; + detail::reader_fsm read_fsm_; detail::connection_logger logger_; }; diff --git a/include/boost/redis/detail/multiplexer.hpp b/include/boost/redis/detail/multiplexer.hpp index 8b01409e..fde726f3 100644 --- a/include/boost/redis/detail/multiplexer.hpp +++ b/include/boost/redis/detail/multiplexer.hpp @@ -8,13 +8,13 @@ #define BOOST_REDIS_MULTIPLEXER_HPP #include -#include -#include -#include +#include +#include +#include #include #include -#include +#include #include #include @@ -32,7 +32,8 @@ namespace detail { using tribool = std::optional; -struct multiplexer { +class multiplexer { +public: using adapter_type = std::function; using pipeline_adapter_type = std::function< void(std::size_t, resp3::node_view const&, system::error_code&)>; @@ -127,7 +128,8 @@ struct multiplexer { // If the tribool contains no value more data is needed, otherwise // if the value is true the message consumed is a push. [[nodiscard]] - auto consume_next(system::error_code& ec) -> std::pair; + auto consume_next(std::string_view data, system::error_code& ec) + -> std::pair; auto add(std::shared_ptr const& ptr) -> void; auto reset() -> void; @@ -156,18 +158,6 @@ struct multiplexer { return std::string_view{write_buffer_}; } - [[nodiscard]] - auto get_read_buffer() noexcept -> std::string& - { - return read_buffer_; - } - - [[nodiscard]] - auto get_read_buffer() const noexcept -> std::string const& - { - return read_buffer_; - } - // TODO: Change signature to receive an adapter instead of a // response. template @@ -191,17 +181,18 @@ struct multiplexer { [[nodiscard]] auto is_waiting_response() const noexcept -> bool; - [[nodiscard]] - auto on_finish_parsing(bool is_push) -> std::size_t; + void commit_usage(bool is_push, std::size_t size); [[nodiscard]] - auto is_next_push() const noexcept -> bool; + auto is_next_push(std::string_view data) const noexcept -> bool; // Releases the number of requests that have been released. [[nodiscard]] auto release_push_requests() -> std::size_t; - std::string read_buffer_; + [[nodiscard]] + tribool consume_next_impl(std::string_view data, system::error_code& ec); + std::string write_buffer_; std::deque> reqs_; resp3::parser parser_{}; diff --git a/include/boost/redis/detail/read_buffer.hpp b/include/boost/redis/detail/read_buffer.hpp new file mode 100644 index 00000000..cad248ec --- /dev/null +++ b/include/boost/redis/detail/read_buffer.hpp @@ -0,0 +1,57 @@ +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#ifndef BOOST_REDIS_READ_BUFFER_HPP +#define BOOST_REDIS_READ_BUFFER_HPP + +#include + +#include +#include +#include +#include + +namespace boost::redis::detail { + +class read_buffer { +public: + using span_type = span; + + [[nodiscard]] + system::error_code prepare_append(std::size_t append_size, std::size_t max_buffer_size); + + void commit_append(std::size_t read_size); + + [[nodiscard]] + auto get_append_buffer() noexcept -> span_type; + + [[nodiscard]] + auto get_committed_buffer() const noexcept -> std::string_view; + + [[nodiscard]] + auto get_committed_size() const noexcept -> std::size_t; + + void clear(); + + // Consume committed data. + auto consume_committed(std::size_t size) -> std::size_t; + + void reserve(std::size_t n); + + friend + bool operator==(read_buffer const& lhs, read_buffer const& rhs); + + friend + bool operator!=(read_buffer const& lhs, read_buffer const& rhs); + +private: + std::vector buffer_; + std::size_t append_buf_begin_ = 0; +}; + +} // namespace boost::redis::detail + +#endif // BOOST_REDIS_READ_BUFFER_HPP diff --git a/include/boost/redis/detail/reader_fsm.hpp b/include/boost/redis/detail/reader_fsm.hpp index 575ee97f..09763366 100644 --- a/include/boost/redis/detail/reader_fsm.hpp +++ b/include/boost/redis/detail/reader_fsm.hpp @@ -18,6 +18,12 @@ namespace boost::redis::detail { class reader_fsm { public: + // See config.hpp for the meaning of these parameters. + struct config { + std::size_t read_buffer_append_size = 4096; + std::size_t max_read_size = -1; + }; + struct action { enum class type { @@ -41,8 +47,20 @@ class reader_fsm { system::error_code ec, asio::cancellation_type_t /*cancel_state*/); + void set_config(config const& cfg) noexcept { cfg_ = cfg; }; + + void reset(); + + [[nodiscard]] + auto get_append_buffer() noexcept + { + return read_buffer_.get_append_buffer(); + } + private: int resume_point_{0}; + read_buffer read_buffer_; + config cfg_; action action_after_resume_; action::type next_read_type_ = action::type::append_some; multiplexer* mpx_ = nullptr; diff --git a/include/boost/redis/error.hpp b/include/boost/redis/error.hpp index 6ce83555..8008265f 100644 --- a/include/boost/redis/error.hpp +++ b/include/boost/redis/error.hpp @@ -88,6 +88,9 @@ enum class error /// The configuration specified UNIX sockets with SSL, which is not supported. unix_sockets_ssl_unsupported, + + /// The size of the read buffer would exceed it maximum configured value. + exceeds_maximum_read_buffer_size, }; /** diff --git a/include/boost/redis/impl/error.ipp b/include/boost/redis/impl/error.ipp index f1f61960..2c2d0b63 100644 --- a/include/boost/redis/impl/error.ipp +++ b/include/boost/redis/impl/error.ipp @@ -50,6 +50,8 @@ struct error_category_impl : system::error_category { "supported by the system."; case error::unix_sockets_ssl_unsupported: return "The configuration specified UNIX sockets with SSL, which is not supported."; + case error::exceeds_maximum_read_buffer_size: + return "The size of the read buffer would exceed it maximum configured value"; default: BOOST_ASSERT(false); return "Boost.Redis error."; } } diff --git a/include/boost/redis/impl/multiplexer.ipp b/include/boost/redis/impl/multiplexer.ipp index 021a4e9f..a4007c2e 100644 --- a/include/boost/redis/impl/multiplexer.ipp +++ b/include/boost/redis/impl/multiplexer.ipp @@ -81,7 +81,7 @@ void multiplexer::add(std::shared_ptr const& info) } } -std::pair multiplexer::consume_next(system::error_code& ec) +tribool multiplexer::consume_next_impl(std::string_view data, system::error_code& ec) { // We arrive here in two states: // @@ -93,18 +93,16 @@ std::pair multiplexer::consume_next(system::error_code& ec // 2. On a new message, in which case we have to determine // whether the next messag is a push or a response. // + + BOOST_ASSERT(!data.empty()); if (!on_push_) // Prepare for new message. - on_push_ = is_next_push(); + on_push_ = is_next_push(data); if (on_push_) { - if (!resp3::parse(parser_, read_buffer_, receive_adapter_, ec)) - return std::make_pair(std::nullopt, 0); - - if (ec) - return std::make_pair(std::make_optional(true), 0); + if (!resp3::parse(parser_, data, receive_adapter_, ec)) + return std::nullopt; - auto const size = on_finish_parsing(true); - return std::make_pair(std::make_optional(true), size); + return std::make_optional(true); } BOOST_ASSERT_MSG( @@ -114,13 +112,13 @@ std::pair multiplexer::consume_next(system::error_code& ec BOOST_ASSERT(reqs_.front() != nullptr); BOOST_ASSERT(reqs_.front()->get_remaining_responses() != 0); - if (!resp3::parse(parser_, read_buffer_, reqs_.front()->get_adapter(), ec)) - return std::make_pair(std::nullopt, 0); + if (!resp3::parse(parser_, data, reqs_.front()->get_adapter(), ec)) + return std::nullopt; if (ec) { reqs_.front()->notify_error(ec); reqs_.pop_front(); - return std::make_pair(std::make_optional(false), 0); + return std::make_optional(false); } reqs_.front()->commit_response(parser_.get_consumed()); @@ -130,14 +128,31 @@ std::pair multiplexer::consume_next(system::error_code& ec reqs_.pop_front(); } - auto const size = on_finish_parsing(false); - return std::make_pair(std::make_optional(false), size); + return std::make_optional(false); +} + +std::pair multiplexer::consume_next( + std::string_view data, + system::error_code& ec) +{ + auto const ret = consume_next_impl(data, ec); + auto const consumed = parser_.get_consumed(); + if (ec) { + return std::make_pair(ret, consumed); + } + + if (ret.has_value()) { + parser_.reset(); + commit_usage(ret.value(), consumed); + return std::make_pair(ret, consumed); + } + + return std::make_pair(std::nullopt, consumed); } void multiplexer::reset() { write_buffer_.clear(); - read_buffer_.clear(); parser_.reset(); on_push_ = false; cancel_run_called_ = false; @@ -222,35 +237,29 @@ auto multiplexer::cancel_on_conn_lost() -> std::size_t return ret; } -std::size_t multiplexer::on_finish_parsing(bool is_push) +void multiplexer::commit_usage(bool is_push, std::size_t size) { if (is_push) { usage_.pushes_received += 1; - usage_.push_bytes_received += parser_.get_consumed(); + usage_.push_bytes_received += size; + on_push_ = false; } else { usage_.responses_received += 1; - usage_.response_bytes_received += parser_.get_consumed(); + usage_.response_bytes_received += size; } - - on_push_ = false; - read_buffer_.erase(0, parser_.get_consumed()); - auto const size = parser_.get_consumed(); - parser_.reset(); - return size; } -bool multiplexer::is_next_push() const noexcept +bool multiplexer::is_next_push(std::string_view data) const noexcept { - BOOST_ASSERT(!read_buffer_.empty()); - // Useful links to understand the heuristics below. // // - https://github.com/redis/redis/issues/11784 // - https://github.com/redis/redis/issues/6426 // - https://github.com/boostorg/redis/issues/170 - // The message's resp3 type is a push. - if (resp3::to_type(read_buffer_.front()) == resp3::type::push) + // Test if the message resp3 type is a push. + BOOST_ASSERT(!data.empty()); + if (resp3::to_type(data.front()) == resp3::type::push) return true; // This is non-push type and the requests queue is empty. I have diff --git a/include/boost/redis/impl/read_buffer.ipp b/include/boost/redis/impl/read_buffer.ipp new file mode 100644 index 00000000..0c18f857 --- /dev/null +++ b/include/boost/redis/impl/read_buffer.ipp @@ -0,0 +1,87 @@ +/* Copyright (c) 2018-2025 Marcelo Zimbres Silva (mzimbres@gmail.com) + * + * Distributed under the Boost Software License, Version 1.0. (See + * accompanying file LICENSE.txt) + */ + +#include + +#include +#include + +#include + +namespace boost::redis::detail { + +system::error_code +read_buffer::prepare_append(std::size_t append_size, std::size_t max_buffer_size) +{ + BOOST_ASSERT(append_buf_begin_ == buffer_.size()); + + auto const new_size = append_buf_begin_ + append_size; + + if (new_size > max_buffer_size) { + return error::exceeds_maximum_read_buffer_size; + } + + buffer_.resize(new_size); + return {}; +} + +void read_buffer::commit_append(std::size_t read_size) +{ + BOOST_ASSERT(buffer_.size() >= (append_buf_begin_ + read_size)); + buffer_.resize(append_buf_begin_ + read_size); + append_buf_begin_ = buffer_.size(); +} + +auto read_buffer::get_append_buffer() noexcept -> span_type +{ + auto const size = buffer_.size(); + return make_span(buffer_.data() + append_buf_begin_, size - append_buf_begin_); +} + +auto read_buffer::get_committed_buffer() const noexcept -> std::string_view +{ + BOOST_ASSERT(!buffer_.empty()); + return {buffer_.data(), append_buf_begin_}; +} + +auto read_buffer::get_committed_size() const noexcept -> std::size_t { return append_buf_begin_; } + +void read_buffer::clear() +{ + buffer_.clear(); + append_buf_begin_ = 0; +} + +std::size_t read_buffer::consume_committed(std::size_t size) +{ + // Consumes only committed data. + if (size > append_buf_begin_) + size = append_buf_begin_; + + buffer_.erase(buffer_.begin(), buffer_.begin() + size); + BOOST_ASSERT(append_buf_begin_ >= size); + append_buf_begin_ -= size; + return size; +} + +void read_buffer::reserve(std::size_t n) +{ + buffer_.reserve(n); +} + +bool operator==(read_buffer const& lhs, read_buffer const& rhs) +{ + return + lhs.buffer_ == rhs.buffer_ && + lhs.append_buf_begin_ == rhs.append_buf_begin_; +} + +bool operator!=(read_buffer const& lhs, read_buffer const& rhs) +{ + return !(lhs == rhs); +} + +} // namespace boost::redis::detail diff --git a/include/boost/redis/impl/reader_fsm.ipp b/include/boost/redis/impl/reader_fsm.ipp index b248f315..73b03772 100644 --- a/include/boost/redis/impl/reader_fsm.ipp +++ b/include/boost/redis/impl/reader_fsm.ipp @@ -24,22 +24,32 @@ reader_fsm::action reader_fsm::resume( BOOST_REDIS_YIELD(resume_point_, 1, action::type::setup_cancellation) for (;;) { - BOOST_REDIS_YIELD(resume_point_, 2, next_read_type_) + ec = read_buffer_.prepare_append(cfg_.read_buffer_append_size, cfg_.max_read_size); + if (ec) { + action_after_resume_ = {action::type::done, 0, ec}; + BOOST_REDIS_YIELD(resume_point_, 2, action::type::cancel_run) + return action_after_resume_; + } + + BOOST_REDIS_YIELD(resume_point_, 3, next_read_type_) + read_buffer_.commit_append(bytes_read); if (ec) { // TODO: If an error occurred but data was read (i.e. // bytes_read != 0) we should try to process that data and // deliver it to the user before calling cancel_run. action_after_resume_ = {action::type::done, bytes_read, ec}; - BOOST_REDIS_YIELD(resume_point_, 3, action::type::cancel_run) + BOOST_REDIS_YIELD(resume_point_, 4, action::type::cancel_run) return action_after_resume_; } next_read_type_ = action::type::append_some; - while (!mpx_->get_read_buffer().empty()) { - res_ = mpx_->consume_next(ec); + while (read_buffer_.get_committed_size() != 0) { + res_ = mpx_->consume_next(read_buffer_.get_committed_buffer(), ec); if (ec) { + // TODO: Perhaps log what has not been consumed to aid + // debugging. action_after_resume_ = {action::type::done, res_.second, ec}; - BOOST_REDIS_YIELD(resume_point_, 4, action::type::cancel_run) + BOOST_REDIS_YIELD(resume_point_, 5, action::type::cancel_run) return action_after_resume_; } @@ -48,6 +58,8 @@ reader_fsm::action reader_fsm::resume( break; } + read_buffer_.consume_committed(res_.second); + if (res_.first.value()) { BOOST_REDIS_YIELD(resume_point_, 6, action::type::notify_push_receiver, res_.second) if (ec) { @@ -71,4 +83,12 @@ reader_fsm::action reader_fsm::resume( return {action::type::done, 0, system::error_code()}; } +void reader_fsm::reset() +{ + resume_point_ = 0; + next_read_type_ = action::type::append_some; + res_ = {std::make_pair(std::nullopt, 0)}; + read_buffer_.clear(); +} + } // namespace boost::redis::detail diff --git a/include/boost/redis/resp3/impl/parser.ipp b/include/boost/redis/resp3/impl/parser.ipp index 0d126a57..c53920d5 100644 --- a/include/boost/redis/resp3/impl/parser.ipp +++ b/include/boost/redis/resp3/impl/parser.ipp @@ -34,17 +34,6 @@ void parser::reset() sizes_[0] = 2; // The sentinel must be more than 1. } -std::size_t parser::get_suggested_buffer_growth(std::size_t hint) const noexcept -{ - if (!bulk_expected()) - return hint; - - if (hint < bulk_length_ + 2) - return bulk_length_ + 2; - - return hint; -} - std::size_t parser::get_consumed() const noexcept { return consumed_; } bool parser::done() const noexcept diff --git a/include/boost/redis/resp3/parser.hpp b/include/boost/redis/resp3/parser.hpp index 4a95a984..52018074 100644 --- a/include/boost/redis/resp3/parser.hpp +++ b/include/boost/redis/resp3/parser.hpp @@ -67,8 +67,6 @@ class parser { [[nodiscard]] auto done() const noexcept -> bool; - auto get_suggested_buffer_growth(std::size_t hint) const noexcept -> std::size_t; - auto get_consumed() const noexcept -> std::size_t; auto consume(std::string_view view, system::error_code& ec) noexcept -> result; diff --git a/include/boost/redis/src.hpp b/include/boost/redis/src.hpp index f3a2ac4b..144b3a4a 100644 --- a/include/boost/redis/src.hpp +++ b/include/boost/redis/src.hpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include diff --git a/test/common.cpp b/test/common.cpp index ceb6b35f..3e9b174b 100644 --- a/test/common.cpp +++ b/test/common.cpp @@ -50,7 +50,6 @@ boost::redis::config make_test_config() { boost::redis::config cfg; cfg.addr.host = get_server_hostname(); - cfg.max_read_size = 1000000; return cfg; } @@ -69,3 +68,10 @@ void run_coroutine_test(net::awaitable op, std::chrono::steady_clock::dura throw std::runtime_error("Coroutine test did not finish"); } #endif // BOOST_ASIO_HAS_CO_AWAIT + +void append_read_data(boost::redis::detail::reader_fsm& fsm, std::string_view data) +{ + auto const buffer = fsm.get_append_buffer(); + BOOST_ASSERT(data.size() <= buffer.size()); + std::copy(data.begin(), data.end(), buffer.begin()); +} diff --git a/test/common.hpp b/test/common.hpp index b322dc54..090eef95 100644 --- a/test/common.hpp +++ b/test/common.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -34,3 +35,5 @@ void run( boost::redis::config cfg = make_test_config(), boost::system::error_code ec = boost::asio::error::operation_aborted, boost::redis::operation op = boost::redis::operation::receive); + +void append_read_data(boost::redis::detail::reader_fsm& fsm, std::string_view data); diff --git a/test/test_exec_fsm.cpp b/test/test_exec_fsm.cpp index 32b9a72b..7f3711c5 100644 --- a/test/test_exec_fsm.cpp +++ b/test/test_exec_fsm.cpp @@ -15,6 +15,8 @@ #include #include +#include "common.hpp" + #include #include #include @@ -117,8 +119,7 @@ void test_success() BOOST_TEST_EQ(mpx.commit_write(), 0u); // all requests expect a response // Simulate a successful read - mpx.get_read_buffer() = "$5\r\nhello\r\n"; - auto req_status = mpx.consume_next(ec); + auto req_status = mpx.consume_next("$5\r\nhello\r\n", ec); BOOST_TEST_EQ(ec, error_code()); BOOST_TEST_EQ(req_status.first.value(), false); // it wasn't a push BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed @@ -159,10 +160,9 @@ void test_parse_error() // The second field should be a number (rather than the empty string). // Note that although part of the buffer was consumed, the multiplexer // currently throws this information away. - mpx.get_read_buffer() = "*2\r\n$5\r\nhello\r\n:\r\n"; - auto req_status = mpx.consume_next(ec); + auto req_status = mpx.consume_next("*2\r\n$5\r\nhello\r\n:\r\n", ec); BOOST_TEST_EQ(ec, error::empty_field); - BOOST_TEST_EQ(req_status.second, 0u); + BOOST_TEST_EQ(req_status.second, 15u); BOOST_TEST_EQ(input.done_calls, 1u); // This will awaken the exec operation, and should complete the operation @@ -218,8 +218,7 @@ void test_not_connected() BOOST_TEST_EQ(mpx.commit_write(), 0u); // all requests expect a response // Simulate a successful read - mpx.get_read_buffer() = "$5\r\nhello\r\n"; - auto req_status = mpx.consume_next(ec); + auto req_status = mpx.consume_next("$5\r\nhello\r\n", ec); BOOST_TEST_EQ(ec, error_code()); BOOST_TEST_EQ(req_status.first.value(), false); // it wasn't a push BOOST_TEST_EQ(req_status.second, 11u); // the entire buffer was consumed @@ -342,8 +341,7 @@ void test_cancel_notwaiting_notterminal() BOOST_TEST_EQ_MSG(act, exec_action_type::wait_for_response, tc.name); // Simulate a successful read - mpx.get_read_buffer() = "$5\r\nhello\r\n"; - auto req_status = mpx.consume_next(ec); + auto req_status = mpx.consume_next("$5\r\nhello\r\n", ec); BOOST_TEST_EQ_MSG(ec, error_code(), tc.name); BOOST_TEST_EQ_MSG(req_status.first.value(), false, tc.name); // it wasn't a push BOOST_TEST_EQ_MSG(req_status.second, 11u, tc.name); // the entire buffer was consumed diff --git a/test/test_low_level.cpp b/test/test_low_level.cpp index edd7cc4b..c50d2b0f 100644 --- a/test/test_low_level.cpp +++ b/test/test_low_level.cpp @@ -528,6 +528,7 @@ BOOST_AUTO_TEST_CASE(cover_error) check_error("boost.redis", boost::redis::error::sync_receive_push_failed); check_error("boost.redis", boost::redis::error::incompatible_node_depth); check_error("boost.redis", boost::redis::error::resp3_hello); + check_error("boost.redis", boost::redis::error::exceeds_maximum_read_buffer_size); } std::string get_type_as_str(boost::redis::resp3::type t) diff --git a/test/test_low_level_sync_sans_io.cpp b/test/test_low_level_sync_sans_io.cpp index 24bf7c67..7ee2e5b0 100644 --- a/test/test_low_level_sync_sans_io.cpp +++ b/test/test_low_level_sync_sans_io.cpp @@ -11,9 +11,12 @@ #include #include #include +#include #define BOOST_TEST_MODULE conn_quit #include +#include "common.hpp" + #include #include @@ -30,6 +33,7 @@ using boost::redis::generic_response; using boost::redis::resp3::node; using boost::redis::resp3::to_string; using boost::redis::any_adapter; +using boost::system::error_code; BOOST_AUTO_TEST_CASE(low_level_sync_sans_io) { @@ -258,10 +262,8 @@ BOOST_AUTO_TEST_CASE(multiplexer_push) generic_response resp; mpx.set_receive_response(resp); - mpx.get_read_buffer() = ">2\r\n+one\r\n+two\r\n"; - boost::system::error_code ec; - auto const ret = mpx.consume_next(ec); + auto const ret = mpx.consume_next(">2\r\n+one\r\n+two\r\n", ec); BOOST_TEST(ret.first.value()); BOOST_CHECK_EQUAL(ret.second, 16u); @@ -282,16 +284,17 @@ BOOST_AUTO_TEST_CASE(multiplexer_push_needs_more) generic_response resp; mpx.set_receive_response(resp); + std::string msg; // Only part of the message. - mpx.get_read_buffer() = ">2\r\n+one\r"; + msg += ">2\r\n+one\r"; boost::system::error_code ec; - auto ret = mpx.consume_next(ec); + auto ret = mpx.consume_next(msg, ec); BOOST_TEST(!ret.first.has_value()); - mpx.get_read_buffer().append("\n+two\r\n"); - ret = mpx.consume_next(ec); + msg += "\n+two\r\n"; + ret = mpx.consume_next(msg, ec); BOOST_TEST(ret.first.value()); BOOST_CHECK_EQUAL(ret.second, 16u); @@ -378,20 +381,14 @@ BOOST_AUTO_TEST_CASE(multiplexer_pipeline) BOOST_TEST(item2.done); BOOST_TEST(!item3.done); - // Simulates a socket read by putting some data in the read buffer. - mpx.get_read_buffer().append("+one\r\n"); - // Consumes the next message in the read buffer. boost::system::error_code ec; - auto const ret = mpx.consume_next(ec); + auto const ret = mpx.consume_next("+one\r\n", ec); // The read operation should have been successfull. BOOST_TEST(ret.first.has_value()); BOOST_TEST(ret.second != 0u); - // The read buffer should also be empty now - BOOST_TEST(mpx.get_read_buffer().empty()); - // The last request still did not get a response. BOOST_TEST(item1.done); BOOST_TEST(item2.done); @@ -399,3 +396,64 @@ BOOST_AUTO_TEST_CASE(multiplexer_pipeline) // TODO: Check the first request was removed from the queue. } + +BOOST_AUTO_TEST_CASE(read_buffer_prepare_error) +{ + using boost::redis::detail::read_buffer; + + read_buffer buf; + + // Usual case, max size is bigger then requested size. + auto ec = buf.prepare_append(10, 10); + BOOST_TEST(!ec); + buf.commit_append(10); + + // Corner case, max size is equal to the requested size. + ec = buf.prepare_append(10, 20); + BOOST_TEST(!ec); + buf.commit_append(10); + buf.consume_committed(20); + + auto const tmp = buf; + + // Error case, max size is smaller to the requested size. + ec = buf.prepare_append(10, 9); + BOOST_TEST(ec == error_code{boost::redis::error::exceeds_maximum_read_buffer_size}); + + // Check that an error call has no side effects. + auto const res = buf == tmp; + BOOST_TEST(res); +} + +BOOST_AUTO_TEST_CASE(read_buffer_prepare_consume_only_committed_data) +{ + using boost::redis::detail::read_buffer; + + read_buffer buf; + + auto ec = buf.prepare_append(10, 10); + BOOST_TEST(!ec); + + // No data has been committed yet so nothing can be consummed. + BOOST_CHECK_EQUAL(buf.consume_committed(5), 0u); + + buf.commit_append(10); + + // All five bytes can be consumed. + BOOST_CHECK_EQUAL(buf.consume_committed(5), 5u); + + // Only the remaining five bytes can be consumed + BOOST_CHECK_EQUAL(buf.consume_committed(7), 5u); +} + +BOOST_AUTO_TEST_CASE(read_buffer_check_buffer_size) +{ + using boost::redis::detail::read_buffer; + + read_buffer buf; + + auto ec = buf.prepare_append(10, 10); + BOOST_TEST(!ec); + + BOOST_CHECK_EQUAL(buf.get_append_buffer().size(), 10u); +} diff --git a/test/test_reader_fsm.cpp b/test/test_reader_fsm.cpp index 4aa0c430..d1a33133 100644 --- a/test/test_reader_fsm.cpp +++ b/test/test_reader_fsm.cpp @@ -12,6 +12,8 @@ #include #include +#include "common.hpp" + namespace net = boost::asio; namespace redis = boost::redis; using boost::system::error_code; @@ -51,13 +53,15 @@ void test_push() BOOST_TEST_EQ(act.type_, action::type::append_some); // The fsm is asking for data. - mpx.get_read_buffer().append(">1\r\n+msg1\r\n"); - mpx.get_read_buffer().append(">1\r\n+msg2 \r\n"); - mpx.get_read_buffer().append(">1\r\n+msg3 \r\n"); - auto const bytes_read = mpx.get_read_buffer().size(); + std::string const payload = + ">1\r\n+msg1\r\n" + ">1\r\n+msg2 \r\n" + ">1\r\n+msg3 \r\n"; + + append_read_data(fsm, payload); // Deliver the 1st push - act = fsm.resume(bytes_read, ec, cancellation_type_t::none); + act = fsm.resume(payload.size(), ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver); BOOST_TEST_EQ(act.push_size_, 11u); BOOST_TEST_EQ(act.ec_, error_code()); @@ -100,20 +104,20 @@ void test_read_needs_more() std::string const msg[] = {">3\r", "\n+msg1\r\n+ms", "g2\r\n+msg3\r\n"}; // Passes the first part to the fsm. - mpx.get_read_buffer().append(msg[0]); + append_read_data(fsm, msg[0]); act = fsm.resume(msg[0].size(), ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::needs_more); BOOST_TEST_EQ(act.ec_, error_code()); // Passes the second part to the fsm. - mpx.get_read_buffer().append(msg[1]); + append_read_data(fsm, msg[1]); act = fsm.resume(msg[1].size(), ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::needs_more); BOOST_TEST_EQ(act.ec_, error_code()); // Passes the third and last part to the fsm, next it should ask us // to deliver the message. - mpx.get_read_buffer().append(msg[2]); + append_read_data(fsm, msg[2]); act = fsm.resume(msg[2].size(), ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver); BOOST_TEST_EQ(act.push_size_, msg[0].size() + msg[1].size() + msg[2].size()); @@ -141,16 +145,16 @@ void test_read_error() BOOST_TEST_EQ(act.type_, action::type::append_some); // The fsm is asking for data. - mpx.get_read_buffer().append(">1\r\n+msg1\r\n"); - auto const bytes_read = mpx.get_read_buffer().size(); + std::string const payload = ">1\r\n+msg1\r\n"; + append_read_data(fsm, payload); // Deliver the data - act = fsm.resume(bytes_read, {net::error::operation_aborted}, cancellation_type_t::none); + act = fsm.resume(payload.size(), {net::error::operation_aborted}, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::cancel_run); BOOST_TEST_EQ(act.ec_, error_code()); // Finish - act = fsm.resume(bytes_read, ec, cancellation_type_t::none); + act = fsm.resume(0, ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::done); BOOST_TEST_EQ(act.ec_, error_code{net::error::operation_aborted}); } @@ -171,16 +175,16 @@ void test_parse_error() BOOST_TEST_EQ(act.type_, action::type::append_some); // The fsm is asking for data. - mpx.get_read_buffer().append(">a\r\n"); - auto const bytes_read = mpx.get_read_buffer().size(); + std::string const payload = ">a\r\n"; + append_read_data(fsm, payload); // Deliver the data - act = fsm.resume(bytes_read, {}, cancellation_type_t::none); + act = fsm.resume(payload.size(), {}, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::cancel_run); BOOST_TEST_EQ(act.ec_, error_code()); // Finish - act = fsm.resume(bytes_read, {}, cancellation_type_t::none); + act = fsm.resume(0, {}, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::done); BOOST_TEST_EQ(act.ec_, error_code{redis::error::not_a_number}); } @@ -201,16 +205,16 @@ void test_push_deliver_error() BOOST_TEST_EQ(act.type_, action::type::append_some); // The fsm is asking for data. - mpx.get_read_buffer().append(">1\r\n+msg1\r\n"); - auto const bytes_read = mpx.get_read_buffer().size(); + std::string const payload = ">1\r\n+msg1\r\n"; + append_read_data(fsm, payload); // Deliver the data - act = fsm.resume(bytes_read, {}, cancellation_type_t::none); + act = fsm.resume(payload.size(), {}, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver); BOOST_TEST_EQ(act.ec_, error_code()); // Resumes from notifying a push with an error. - act = fsm.resume(bytes_read, net::error::operation_aborted, cancellation_type_t::none); + act = fsm.resume(0, net::error::operation_aborted, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::cancel_run); // Finish @@ -219,10 +223,39 @@ void test_push_deliver_error() BOOST_TEST_EQ(act.ec_, error_code{net::error::operation_aborted}); } +void test_max_read_buffer_size() +{ + multiplexer mpx; + generic_response resp; + mpx.set_receive_response(resp); + reader_fsm fsm{mpx}; + fsm.set_config({5, 7}); + error_code ec; + action act; + + // Initiate + act = fsm.resume(0, ec, cancellation_type_t::none); + BOOST_TEST_EQ(act.type_, action::type::setup_cancellation); + act = fsm.resume(0, ec, cancellation_type_t::none); + BOOST_TEST_EQ(act.type_, action::type::append_some); + + // Passes the first part to the fsm. + std::string const part1 = ">3\r\n"; + append_read_data(fsm, part1); + act = fsm.resume(part1.size(), {}, cancellation_type_t::none); + BOOST_TEST_EQ(act.type_, action::type::cancel_run); + BOOST_TEST_EQ(act.ec_, error_code()); + + act = fsm.resume({}, {}, cancellation_type_t::none); + BOOST_TEST_EQ(act.type_, action::type::done); + BOOST_TEST_EQ(act.ec_, redis::error::exceeds_maximum_read_buffer_size); +} + } // namespace int main() { + test_max_read_buffer_size(); test_push_deliver_error(); test_read_needs_more(); test_push(); From 8ee2213efe930a8e9c7f052bee2c91b4d5761faf Mon Sep 17 00:00:00 2001 From: Marcelo Zimbres Date: Fri, 18 Jul 2025 00:04:16 +0200 Subject: [PATCH 2/2] Code review changes. --- include/boost/redis/config.hpp | 7 ++-- include/boost/redis/connection.hpp | 17 ++++++---- include/boost/redis/detail/multiplexer.hpp | 2 +- include/boost/redis/detail/read_buffer.hpp | 24 +++++++++----- include/boost/redis/detail/reader_fsm.hpp | 26 +++------------ include/boost/redis/error.hpp | 2 +- include/boost/redis/impl/error.ipp | 3 +- include/boost/redis/impl/read_buffer.ipp | 24 +++++--------- include/boost/redis/impl/reader_fsm.ipp | 24 +++++--------- test/common.cpp | 4 +-- test/common.hpp | 2 +- test/test_low_level_sync_sans_io.cpp | 17 ++++++---- test/test_reader_fsm.cpp | 38 +++++++++++++--------- 13 files changed, 94 insertions(+), 96 deletions(-) diff --git a/include/boost/redis/config.hpp b/include/boost/redis/config.hpp index 06c81002..b564bdfc 100644 --- a/include/boost/redis/config.hpp +++ b/include/boost/redis/config.hpp @@ -88,7 +88,7 @@ struct config { */ std::chrono::steady_clock::duration reconnect_wait_interval = std::chrono::seconds{1}; - /** @brief Maximum size of the read-buffer in bytes. + /** @brief Maximum size of the socket read-buffer in bytes. * * Sets a limit on how much data is allowed to be read into the * read buffer. It can be used to prevent DDOS. @@ -98,8 +98,9 @@ struct config { /** @brief read_buffer_append_size * * The size by which the read buffer grows when more space is - * needed. There is no need to set this too high because memory is - * reused and the growth will tend to zero. + * needed. This can help avoiding some memory allocations. Once the + * maximum size is reached no more memory allocations are made + * since the buffer is reused. */ std::size_t read_buffer_append_size = 4096; }; diff --git a/include/boost/redis/connection.hpp b/include/boost/redis/connection.hpp index ac078665..e23bf674 100644 --- a/include/boost/redis/connection.hpp +++ b/include/boost/redis/connection.hpp @@ -160,17 +160,19 @@ struct writer_op { template struct reader_op { Conn* conn_; + detail::reader_fsm fsm_; public: reader_op(Conn& conn) noexcept : conn_{&conn} + , fsm_{conn.read_buffer_, conn.mpx_} { } template void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0) { for (;;) { - auto act = conn_->read_fsm_.resume(n, ec, self.get_cancellation_state().cancelled()); + auto act = fsm_.resume(n, ec, self.get_cancellation_state().cancelled()); conn_->logger_.on_fsm_resume(act); @@ -181,7 +183,7 @@ struct reader_op { case reader_fsm::action::type::needs_more: case reader_fsm::action::type::append_some: { - auto const buf = conn_->read_fsm_.get_append_buffer(); + auto const buf = conn_->read_buffer_.get_append_buffer(); conn_->stream_.async_read_some(asio::buffer(buf), std::move(self)); } return; @@ -276,8 +278,8 @@ class run_op { // If we were successful, run all the connection tasks if (!ec) { + conn_->read_buffer_.clear(); conn_->mpx_.reset(); - conn_->read_fsm_.reset(); // Note: Order is important here because the writer might // trigger an async_write before the async_hello thereby @@ -385,7 +387,6 @@ class basic_connection { , reconnect_timer_{ex} , receive_channel_{ex, 256} , health_checker_{ex} - , read_fsm_{mpx_} , logger_{std::move(lgr)} { set_receive_response(ignore); @@ -489,7 +490,11 @@ class basic_connection { cfg_ = cfg; health_checker_.set_config(cfg); handshaker_.set_config(cfg); - read_fsm_.set_config({cfg_.read_buffer_append_size, cfg_.max_read_size}); + read_buffer_.set_config({cfg.read_buffer_append_size, cfg.max_read_size}); + + // Reserve some memory to avoid excessive memory allocations in + // the first reads. + read_buffer_.reserve(4048u); return asio::async_compose( detail::run_op{this}, @@ -887,8 +892,8 @@ class basic_connection { resp3_handshaker_type handshaker_; config cfg_; + detail::read_buffer read_buffer_; detail::multiplexer mpx_; - detail::reader_fsm read_fsm_; detail::connection_logger logger_; }; diff --git a/include/boost/redis/detail/multiplexer.hpp b/include/boost/redis/detail/multiplexer.hpp index fde726f3..8ee643d6 100644 --- a/include/boost/redis/detail/multiplexer.hpp +++ b/include/boost/redis/detail/multiplexer.hpp @@ -14,7 +14,7 @@ #include #include -#include +#include #include #include diff --git a/include/boost/redis/detail/read_buffer.hpp b/include/boost/redis/detail/read_buffer.hpp index cad248ec..9f275aee 100644 --- a/include/boost/redis/detail/read_buffer.hpp +++ b/include/boost/redis/detail/read_buffer.hpp @@ -8,11 +8,12 @@ #define BOOST_REDIS_READ_BUFFER_HPP #include +#include #include -#include #include #include +#include namespace boost::redis::detail { @@ -20,14 +21,20 @@ class read_buffer { public: using span_type = span; - [[nodiscard]] - system::error_code prepare_append(std::size_t append_size, std::size_t max_buffer_size); + // See config.hpp for the meaning of these parameters. + struct config { + std::size_t read_buffer_append_size = 4096u; + std::size_t max_read_size = static_cast(-1); + }; - void commit_append(std::size_t read_size); + [[nodiscard]] + auto prepare_append() -> system::error_code; [[nodiscard]] auto get_append_buffer() noexcept -> span_type; + void commit_append(std::size_t read_size); + [[nodiscard]] auto get_committed_buffer() const noexcept -> std::string_view; @@ -41,13 +48,14 @@ class read_buffer { void reserve(std::size_t n); - friend - bool operator==(read_buffer const& lhs, read_buffer const& rhs); + friend bool operator==(read_buffer const& lhs, read_buffer const& rhs); + + friend bool operator!=(read_buffer const& lhs, read_buffer const& rhs); - friend - bool operator!=(read_buffer const& lhs, read_buffer const& rhs); + void set_config(config const& cfg) noexcept { cfg_ = cfg; }; private: + config cfg_ = config{}; std::vector buffer_; std::size_t append_buf_begin_ = 0; }; diff --git a/include/boost/redis/detail/reader_fsm.hpp b/include/boost/redis/detail/reader_fsm.hpp index 09763366..f217dbb9 100644 --- a/include/boost/redis/detail/reader_fsm.hpp +++ b/include/boost/redis/detail/reader_fsm.hpp @@ -6,7 +6,6 @@ #ifndef BOOST_REDIS_READER_FSM_HPP #define BOOST_REDIS_READER_FSM_HPP - #include #include @@ -16,14 +15,10 @@ namespace boost::redis::detail { +class read_buffer; + class reader_fsm { public: - // See config.hpp for the meaning of these parameters. - struct config { - std::size_t read_buffer_append_size = 4096; - std::size_t max_read_size = -1; - }; - struct action { enum class type { @@ -36,31 +31,20 @@ class reader_fsm { }; type type_ = type::setup_cancellation; - std::size_t push_size_ = 0; + std::size_t push_size_ = 0u; system::error_code ec_ = {}; }; - explicit reader_fsm(multiplexer& mpx) noexcept; + explicit reader_fsm(read_buffer& rbuf, multiplexer& mpx) noexcept; action resume( std::size_t bytes_read, system::error_code ec, asio::cancellation_type_t /*cancel_state*/); - void set_config(config const& cfg) noexcept { cfg_ = cfg; }; - - void reset(); - - [[nodiscard]] - auto get_append_buffer() noexcept - { - return read_buffer_.get_append_buffer(); - } - private: int resume_point_{0}; - read_buffer read_buffer_; - config cfg_; + read_buffer* read_buffer_ = nullptr; action action_after_resume_; action::type next_read_type_ = action::type::append_some; multiplexer* mpx_ = nullptr; diff --git a/include/boost/redis/error.hpp b/include/boost/redis/error.hpp index 8008265f..a2ae878c 100644 --- a/include/boost/redis/error.hpp +++ b/include/boost/redis/error.hpp @@ -89,7 +89,7 @@ enum class error /// The configuration specified UNIX sockets with SSL, which is not supported. unix_sockets_ssl_unsupported, - /// The size of the read buffer would exceed it maximum configured value. + /// Reading data from the socket would exceed the maximum size allowed of the read buffer. exceeds_maximum_read_buffer_size, }; diff --git a/include/boost/redis/impl/error.ipp b/include/boost/redis/impl/error.ipp index 2c2d0b63..696aa26c 100644 --- a/include/boost/redis/impl/error.ipp +++ b/include/boost/redis/impl/error.ipp @@ -51,7 +51,8 @@ struct error_category_impl : system::error_category { case error::unix_sockets_ssl_unsupported: return "The configuration specified UNIX sockets with SSL, which is not supported."; case error::exceeds_maximum_read_buffer_size: - return "The size of the read buffer would exceed it maximum configured value"; + return "Reading data from the socket would exceed the maximum size allowed of the read " + "buffer."; default: BOOST_ASSERT(false); return "Boost.Redis error."; } } diff --git a/include/boost/redis/impl/read_buffer.ipp b/include/boost/redis/impl/read_buffer.ipp index 0c18f857..c5eac2d9 100644 --- a/include/boost/redis/impl/read_buffer.ipp +++ b/include/boost/redis/impl/read_buffer.ipp @@ -13,14 +13,13 @@ namespace boost::redis::detail { -system::error_code -read_buffer::prepare_append(std::size_t append_size, std::size_t max_buffer_size) +system::error_code read_buffer::prepare_append() { BOOST_ASSERT(append_buf_begin_ == buffer_.size()); - auto const new_size = append_buf_begin_ + append_size; + auto const new_size = append_buf_begin_ + cfg_.read_buffer_append_size; - if (new_size > max_buffer_size) { + if (new_size > cfg_.max_read_size) { return error::exceeds_maximum_read_buffer_size; } @@ -57,7 +56,8 @@ void read_buffer::clear() std::size_t read_buffer::consume_committed(std::size_t size) { - // Consumes only committed data. + // For convenience, if the requested size is larger than the + // committed buffer we cap it to the maximum. if (size > append_buf_begin_) size = append_buf_begin_; @@ -67,21 +67,13 @@ std::size_t read_buffer::consume_committed(std::size_t size) return size; } -void read_buffer::reserve(std::size_t n) -{ - buffer_.reserve(n); -} +void read_buffer::reserve(std::size_t n) { buffer_.reserve(n); } bool operator==(read_buffer const& lhs, read_buffer const& rhs) { - return - lhs.buffer_ == rhs.buffer_ && - lhs.append_buf_begin_ == rhs.append_buf_begin_; + return lhs.buffer_ == rhs.buffer_ && lhs.append_buf_begin_ == rhs.append_buf_begin_; } -bool operator!=(read_buffer const& lhs, read_buffer const& rhs) -{ - return !(lhs == rhs); -} +bool operator!=(read_buffer const& lhs, read_buffer const& rhs) { return !(lhs == rhs); } } // namespace boost::redis::detail diff --git a/include/boost/redis/impl/reader_fsm.ipp b/include/boost/redis/impl/reader_fsm.ipp index 73b03772..a9f87965 100644 --- a/include/boost/redis/impl/reader_fsm.ipp +++ b/include/boost/redis/impl/reader_fsm.ipp @@ -6,12 +6,14 @@ #include #include +#include #include namespace boost::redis::detail { -reader_fsm::reader_fsm(multiplexer& mpx) noexcept -: mpx_{&mpx} +reader_fsm::reader_fsm(read_buffer& rbuf, multiplexer& mpx) noexcept +: read_buffer_{&rbuf} +, mpx_{&mpx} { } reader_fsm::action reader_fsm::resume( @@ -24,7 +26,7 @@ reader_fsm::action reader_fsm::resume( BOOST_REDIS_YIELD(resume_point_, 1, action::type::setup_cancellation) for (;;) { - ec = read_buffer_.prepare_append(cfg_.read_buffer_append_size, cfg_.max_read_size); + ec = read_buffer_->prepare_append(); if (ec) { action_after_resume_ = {action::type::done, 0, ec}; BOOST_REDIS_YIELD(resume_point_, 2, action::type::cancel_run) @@ -32,7 +34,7 @@ reader_fsm::action reader_fsm::resume( } BOOST_REDIS_YIELD(resume_point_, 3, next_read_type_) - read_buffer_.commit_append(bytes_read); + read_buffer_->commit_append(bytes_read); if (ec) { // TODO: If an error occurred but data was read (i.e. // bytes_read != 0) we should try to process that data and @@ -43,8 +45,8 @@ reader_fsm::action reader_fsm::resume( } next_read_type_ = action::type::append_some; - while (read_buffer_.get_committed_size() != 0) { - res_ = mpx_->consume_next(read_buffer_.get_committed_buffer(), ec); + while (read_buffer_->get_committed_size() != 0) { + res_ = mpx_->consume_next(read_buffer_->get_committed_buffer(), ec); if (ec) { // TODO: Perhaps log what has not been consumed to aid // debugging. @@ -58,7 +60,7 @@ reader_fsm::action reader_fsm::resume( break; } - read_buffer_.consume_committed(res_.second); + read_buffer_->consume_committed(res_.second); if (res_.first.value()) { BOOST_REDIS_YIELD(resume_point_, 6, action::type::notify_push_receiver, res_.second) @@ -83,12 +85,4 @@ reader_fsm::action reader_fsm::resume( return {action::type::done, 0, system::error_code()}; } -void reader_fsm::reset() -{ - resume_point_ = 0; - next_read_type_ = action::type::append_some; - res_ = {std::make_pair(std::nullopt, 0)}; - read_buffer_.clear(); -} - } // namespace boost::redis::detail diff --git a/test/common.cpp b/test/common.cpp index 3e9b174b..64aa22a6 100644 --- a/test/common.cpp +++ b/test/common.cpp @@ -69,9 +69,9 @@ void run_coroutine_test(net::awaitable op, std::chrono::steady_clock::dura } #endif // BOOST_ASIO_HAS_CO_AWAIT -void append_read_data(boost::redis::detail::reader_fsm& fsm, std::string_view data) +void append_read_data(boost::redis::detail::read_buffer& rbuf, std::string_view data) { - auto const buffer = fsm.get_append_buffer(); + auto const buffer = rbuf.get_append_buffer(); BOOST_ASSERT(data.size() <= buffer.size()); std::copy(data.begin(), data.end(), buffer.begin()); } diff --git a/test/common.hpp b/test/common.hpp index 090eef95..2370ca9f 100644 --- a/test/common.hpp +++ b/test/common.hpp @@ -36,4 +36,4 @@ void run( boost::system::error_code ec = boost::asio::error::operation_aborted, boost::redis::operation op = boost::redis::operation::receive); -void append_read_data(boost::redis::detail::reader_fsm& fsm, std::string_view data); +void append_read_data(boost::redis::detail::read_buffer& rbuf, std::string_view data); diff --git a/test/test_low_level_sync_sans_io.cpp b/test/test_low_level_sync_sans_io.cpp index 7ee2e5b0..73e1cdda 100644 --- a/test/test_low_level_sync_sans_io.cpp +++ b/test/test_low_level_sync_sans_io.cpp @@ -7,11 +7,11 @@ #include #include #include +#include #include #include #include #include -#include #define BOOST_TEST_MODULE conn_quit #include @@ -404,12 +404,14 @@ BOOST_AUTO_TEST_CASE(read_buffer_prepare_error) read_buffer buf; // Usual case, max size is bigger then requested size. - auto ec = buf.prepare_append(10, 10); + buf.set_config({10, 10}); + auto ec = buf.prepare_append(); BOOST_TEST(!ec); buf.commit_append(10); // Corner case, max size is equal to the requested size. - ec = buf.prepare_append(10, 20); + buf.set_config({10, 20}); + ec = buf.prepare_append(); BOOST_TEST(!ec); buf.commit_append(10); buf.consume_committed(20); @@ -417,7 +419,8 @@ BOOST_AUTO_TEST_CASE(read_buffer_prepare_error) auto const tmp = buf; // Error case, max size is smaller to the requested size. - ec = buf.prepare_append(10, 9); + buf.set_config({10, 9}); + ec = buf.prepare_append(); BOOST_TEST(ec == error_code{boost::redis::error::exceeds_maximum_read_buffer_size}); // Check that an error call has no side effects. @@ -431,7 +434,8 @@ BOOST_AUTO_TEST_CASE(read_buffer_prepare_consume_only_committed_data) read_buffer buf; - auto ec = buf.prepare_append(10, 10); + buf.set_config({10, 10}); + auto ec = buf.prepare_append(); BOOST_TEST(!ec); // No data has been committed yet so nothing can be consummed. @@ -452,7 +456,8 @@ BOOST_AUTO_TEST_CASE(read_buffer_check_buffer_size) read_buffer buf; - auto ec = buf.prepare_append(10, 10); + buf.set_config({10, 10}); + auto ec = buf.prepare_append(); BOOST_TEST(!ec); BOOST_CHECK_EQUAL(buf.get_append_buffer().size(), 10u); diff --git a/test/test_reader_fsm.cpp b/test/test_reader_fsm.cpp index d1a33133..58a25f0e 100644 --- a/test/test_reader_fsm.cpp +++ b/test/test_reader_fsm.cpp @@ -5,6 +5,7 @@ // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // +#include #include #include @@ -20,6 +21,7 @@ using boost::system::error_code; using net::cancellation_type_t; using redis::detail::reader_fsm; using redis::detail::multiplexer; +using redis::detail::read_buffer; using redis::generic_response; using action = redis::detail::reader_fsm::action; @@ -39,10 +41,11 @@ namespace { void test_push() { + read_buffer rbuf; multiplexer mpx; generic_response resp; mpx.set_receive_response(resp); - reader_fsm fsm{mpx}; + reader_fsm fsm{rbuf, mpx}; error_code ec; action act; @@ -58,7 +61,7 @@ void test_push() ">1\r\n+msg2 \r\n" ">1\r\n+msg3 \r\n"; - append_read_data(fsm, payload); + append_read_data(rbuf, payload); // Deliver the 1st push act = fsm.resume(payload.size(), ec, cancellation_type_t::none); @@ -86,10 +89,11 @@ void test_push() void test_read_needs_more() { + read_buffer rbuf; multiplexer mpx; generic_response resp; mpx.set_receive_response(resp); - reader_fsm fsm{mpx}; + reader_fsm fsm{rbuf, mpx}; error_code ec; action act; @@ -104,20 +108,20 @@ void test_read_needs_more() std::string const msg[] = {">3\r", "\n+msg1\r\n+ms", "g2\r\n+msg3\r\n"}; // Passes the first part to the fsm. - append_read_data(fsm, msg[0]); + append_read_data(rbuf, msg[0]); act = fsm.resume(msg[0].size(), ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::needs_more); BOOST_TEST_EQ(act.ec_, error_code()); // Passes the second part to the fsm. - append_read_data(fsm, msg[1]); + append_read_data(rbuf, msg[1]); act = fsm.resume(msg[1].size(), ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::needs_more); BOOST_TEST_EQ(act.ec_, error_code()); // Passes the third and last part to the fsm, next it should ask us // to deliver the message. - append_read_data(fsm, msg[2]); + append_read_data(rbuf, msg[2]); act = fsm.resume(msg[2].size(), ec, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::notify_push_receiver); BOOST_TEST_EQ(act.push_size_, msg[0].size() + msg[1].size() + msg[2].size()); @@ -131,10 +135,11 @@ void test_read_needs_more() void test_read_error() { + read_buffer rbuf; multiplexer mpx; generic_response resp; mpx.set_receive_response(resp); - reader_fsm fsm{mpx}; + reader_fsm fsm{rbuf, mpx}; error_code ec; action act; @@ -146,7 +151,7 @@ void test_read_error() // The fsm is asking for data. std::string const payload = ">1\r\n+msg1\r\n"; - append_read_data(fsm, payload); + append_read_data(rbuf, payload); // Deliver the data act = fsm.resume(payload.size(), {net::error::operation_aborted}, cancellation_type_t::none); @@ -161,10 +166,11 @@ void test_read_error() void test_parse_error() { + read_buffer rbuf; multiplexer mpx; generic_response resp; mpx.set_receive_response(resp); - reader_fsm fsm{mpx}; + reader_fsm fsm{rbuf, mpx}; error_code ec; action act; @@ -176,7 +182,7 @@ void test_parse_error() // The fsm is asking for data. std::string const payload = ">a\r\n"; - append_read_data(fsm, payload); + append_read_data(rbuf, payload); // Deliver the data act = fsm.resume(payload.size(), {}, cancellation_type_t::none); @@ -191,10 +197,11 @@ void test_parse_error() void test_push_deliver_error() { + read_buffer rbuf; multiplexer mpx; generic_response resp; mpx.set_receive_response(resp); - reader_fsm fsm{mpx}; + reader_fsm fsm{rbuf, mpx}; error_code ec; action act; @@ -206,7 +213,7 @@ void test_push_deliver_error() // The fsm is asking for data. std::string const payload = ">1\r\n+msg1\r\n"; - append_read_data(fsm, payload); + append_read_data(rbuf, payload); // Deliver the data act = fsm.resume(payload.size(), {}, cancellation_type_t::none); @@ -225,11 +232,12 @@ void test_push_deliver_error() void test_max_read_buffer_size() { + read_buffer rbuf; + rbuf.set_config({5, 7}); multiplexer mpx; generic_response resp; mpx.set_receive_response(resp); - reader_fsm fsm{mpx}; - fsm.set_config({5, 7}); + reader_fsm fsm{rbuf, mpx}; error_code ec; action act; @@ -241,7 +249,7 @@ void test_max_read_buffer_size() // Passes the first part to the fsm. std::string const part1 = ">3\r\n"; - append_read_data(fsm, part1); + append_read_data(rbuf, part1); act = fsm.resume(part1.size(), {}, cancellation_type_t::none); BOOST_TEST_EQ(act.type_, action::type::cancel_run); BOOST_TEST_EQ(act.ec_, error_code());