Skip to content

Removes async_append_some #283

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion include/boost/redis/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,21 @@ struct config {
*/
std::chrono::steady_clock::duration reconnect_wait_interval = std::chrono::seconds{1};

/** @brief Maximum size of a socket read, in bytes.
/** @brief Maximum size of the socket read-buffer in bytes.
*
* Sets a limit on how much data is allowed to be read into the
* read buffer. It can be used to prevent DDOS.
*/
std::size_t max_read_size = (std::numeric_limits<std::size_t>::max)();

/** @brief read_buffer_append_size
*
* The size by which the read buffer grows when more space is
* needed. This can help avoiding some memory allocations. Once the
* maximum size is reached no more memory allocations are made
* since the buffer is reused.
*/
std::size_t read_buffer_append_size = 4096;
};

} // namespace boost::redis
Expand Down
81 changes: 12 additions & 69 deletions include/boost/redis/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,56 +57,6 @@
namespace boost::redis {
namespace detail {

template <class AsyncReadStream, class DynamicBuffer>
class append_some_op {
private:
AsyncReadStream& stream_;
DynamicBuffer buf_;
std::size_t size_ = 0;
std::size_t tmp_ = 0;
asio::coroutine coro_{};

public:
append_some_op(AsyncReadStream& stream, DynamicBuffer buf, std::size_t size)
: stream_{stream}
, buf_{std::move(buf)}
, size_{size}
{ }

template <class Self>
void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0)
{
BOOST_ASIO_CORO_REENTER(coro_)
{
tmp_ = buf_.size();
buf_.grow(size_);

BOOST_ASIO_CORO_YIELD
stream_.async_read_some(buf_.data(tmp_, size_), std::move(self));
if (ec) {
self.complete(ec, 0);
return;
}

buf_.shrink(buf_.size() - tmp_ - n);
self.complete({}, n);
}
}
};

template <class AsyncReadStream, class DynamicBuffer, class CompletionToken>
auto async_append_some(
AsyncReadStream& stream,
DynamicBuffer buffer,
std::size_t size,
CompletionToken&& token)
{
return asio::async_compose<CompletionToken, void(system::error_code, std::size_t)>(
append_some_op<AsyncReadStream, DynamicBuffer>{stream, buffer, size},
token,
stream);
}

template <class Executor>
using exec_notifier_type = asio::experimental::channel<
Executor,
Expand Down Expand Up @@ -209,31 +159,18 @@ struct writer_op {

template <class Conn>
struct reader_op {
using dyn_buffer_type = asio::dynamic_string_buffer<
char,
std::char_traits<char>,
std::allocator<char>>;

// TODO: Move this to config so the user can fine tune?
static constexpr std::size_t buffer_growth_hint = 4096;

Conn* conn_;
detail::reader_fsm fsm_;

public:
reader_op(Conn& conn) noexcept
: conn_{&conn}
, fsm_{conn.mpx_}
, fsm_{conn.read_buffer_, conn.mpx_}
{ }

template <class Self>
void operator()(Self& self, system::error_code ec = {}, std::size_t n = 0)
{
using dyn_buffer_type = asio::dynamic_string_buffer<
char,
std::char_traits<char>,
std::allocator<char>>;

for (;;) {
auto act = fsm_.resume(n, ec, self.get_cancellation_state().cancelled());

Expand All @@ -245,11 +182,10 @@ struct reader_op {
continue;
case reader_fsm::action::type::needs_more:
case reader_fsm::action::type::append_some:
async_append_some(
conn_->stream_,
dyn_buffer_type{conn_->mpx_.get_read_buffer(), conn_->cfg_.max_read_size},
conn_->mpx_.get_parser().get_suggested_buffer_growth(buffer_growth_hint),
std::move(self));
{
auto const buf = conn_->read_buffer_.get_append_buffer();
conn_->stream_.async_read_some(asio::buffer(buf), std::move(self));
}
return;
case reader_fsm::action::type::notify_push_receiver:
if (conn_->receive_channel_.try_send(ec, act.push_size_)) {
Expand Down Expand Up @@ -342,6 +278,7 @@ class run_op {

// If we were successful, run all the connection tasks
if (!ec) {
conn_->read_buffer_.clear();
conn_->mpx_.reset();

// Note: Order is important here because the writer might
Expand Down Expand Up @@ -553,6 +490,11 @@ class basic_connection {
cfg_ = cfg;
health_checker_.set_config(cfg);
handshaker_.set_config(cfg);
read_buffer_.set_config({cfg.read_buffer_append_size, cfg.max_read_size});

// Reserve some memory to avoid excessive memory allocations in
// the first reads.
read_buffer_.reserve(4048u);

return asio::async_compose<CompletionToken, void(system::error_code)>(
detail::run_op<this_type>{this},
Expand Down Expand Up @@ -950,6 +892,7 @@ class basic_connection {
resp3_handshaker_type handshaker_;

config cfg_;
detail::read_buffer read_buffer_;
detail::multiplexer mpx_;
detail::connection_logger logger_;
};
Expand Down
35 changes: 13 additions & 22 deletions include/boost/redis/detail/multiplexer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
#define BOOST_REDIS_MULTIPLEXER_HPP

#include <boost/redis/adapter/adapt.hpp>
#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/config.hpp>
#include <boost/redis/operation.hpp>
#include <boost/redis/detail/read_buffer.hpp>
#include <boost/redis/resp3/node.hpp>
#include <boost/redis/resp3/parser.hpp>
#include <boost/redis/resp3/type.hpp>
#include <boost/redis/usage.hpp>

#include <boost/asio/experimental/channel.hpp>
#include <boost/system/error_code.hpp>

#include <algorithm>
#include <deque>
Expand All @@ -32,7 +32,8 @@ namespace detail {

using tribool = std::optional<bool>;

struct multiplexer {
class multiplexer {
public:
using adapter_type = std::function<void(resp3::node_view const&, system::error_code&)>;
using pipeline_adapter_type = std::function<
void(std::size_t, resp3::node_view const&, system::error_code&)>;
Expand Down Expand Up @@ -127,7 +128,8 @@ struct multiplexer {
// If the tribool contains no value more data is needed, otherwise
// if the value is true the message consumed is a push.
[[nodiscard]]
auto consume_next(system::error_code& ec) -> std::pair<tribool, std::size_t>;
auto consume_next(std::string_view data, system::error_code& ec)
-> std::pair<tribool, std::size_t>;

auto add(std::shared_ptr<elem> const& ptr) -> void;
auto reset() -> void;
Expand Down Expand Up @@ -156,18 +158,6 @@ struct multiplexer {
return std::string_view{write_buffer_};
}

[[nodiscard]]
auto get_read_buffer() noexcept -> std::string&
{
return read_buffer_;
}

[[nodiscard]]
auto get_read_buffer() const noexcept -> std::string const&
{
return read_buffer_;
}

// TODO: Change signature to receive an adapter instead of a
// response.
template <class Response>
Expand All @@ -191,17 +181,18 @@ struct multiplexer {
[[nodiscard]]
auto is_waiting_response() const noexcept -> bool;

[[nodiscard]]
auto on_finish_parsing(bool is_push) -> std::size_t;
void commit_usage(bool is_push, std::size_t size);

[[nodiscard]]
auto is_next_push() const noexcept -> bool;
auto is_next_push(std::string_view data) const noexcept -> bool;

// Releases the number of requests that have been released.
[[nodiscard]]
auto release_push_requests() -> std::size_t;

std::string read_buffer_;
[[nodiscard]]
tribool consume_next_impl(std::string_view data, system::error_code& ec);

std::string write_buffer_;
std::deque<std::shared_ptr<elem>> reqs_;
resp3::parser parser_{};
Expand Down
65 changes: 65 additions & 0 deletions include/boost/redis/detail/read_buffer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva ([email protected])
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/

#ifndef BOOST_REDIS_READ_BUFFER_HPP
#define BOOST_REDIS_READ_BUFFER_HPP

#include <boost/core/span.hpp>
#include <boost/system/error_code.hpp>

#include <cstddef>
#include <string_view>
#include <utility>
#include <vector>

namespace boost::redis::detail {

class read_buffer {
public:
using span_type = span<char>;

// See config.hpp for the meaning of these parameters.
struct config {
std::size_t read_buffer_append_size = 4096u;
std::size_t max_read_size = static_cast<std::size_t>(-1);
};

[[nodiscard]]
auto prepare_append() -> system::error_code;

[[nodiscard]]
auto get_append_buffer() noexcept -> span_type;

void commit_append(std::size_t read_size);

[[nodiscard]]
auto get_committed_buffer() const noexcept -> std::string_view;

[[nodiscard]]
auto get_committed_size() const noexcept -> std::size_t;

void clear();

// Consume committed data.
auto consume_committed(std::size_t size) -> std::size_t;

void reserve(std::size_t n);

friend bool operator==(read_buffer const& lhs, read_buffer const& rhs);

friend bool operator!=(read_buffer const& lhs, read_buffer const& rhs);

void set_config(config const& cfg) noexcept { cfg_ = cfg; };

private:
config cfg_ = config{};
std::vector<char> buffer_;
std::size_t append_buf_begin_ = 0;
};

} // namespace boost::redis::detail

#endif // BOOST_REDIS_READ_BUFFER_HPP
8 changes: 5 additions & 3 deletions include/boost/redis/detail/reader_fsm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

#ifndef BOOST_REDIS_READER_FSM_HPP
#define BOOST_REDIS_READER_FSM_HPP

#include <boost/redis/detail/multiplexer.hpp>

#include <boost/asio/cancellation_type.hpp>
Expand All @@ -16,6 +15,8 @@

namespace boost::redis::detail {

class read_buffer;

class reader_fsm {
public:
struct action {
Expand All @@ -30,11 +31,11 @@ class reader_fsm {
};

type type_ = type::setup_cancellation;
std::size_t push_size_ = 0;
std::size_t push_size_ = 0u;
system::error_code ec_ = {};
};

explicit reader_fsm(multiplexer& mpx) noexcept;
explicit reader_fsm(read_buffer& rbuf, multiplexer& mpx) noexcept;

action resume(
std::size_t bytes_read,
Expand All @@ -43,6 +44,7 @@ class reader_fsm {

private:
int resume_point_{0};
read_buffer* read_buffer_ = nullptr;
action action_after_resume_;
action::type next_read_type_ = action::type::append_some;
multiplexer* mpx_ = nullptr;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As written, this yields to a dangling pointer after a connection is moved. I suggest wrapping every member in basic_connection into a state object, and placing this into the heap via a unique_ptr so they're guaranteed to have stable addresses.

Expand Down
3 changes: 3 additions & 0 deletions include/boost/redis/error.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ enum class error

/// The configuration specified UNIX sockets with SSL, which is not supported.
unix_sockets_ssl_unsupported,

/// Reading data from the socket would exceed the maximum size allowed of the read buffer.
exceeds_maximum_read_buffer_size,
};

/**
Expand Down
3 changes: 3 additions & 0 deletions include/boost/redis/impl/error.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ struct error_category_impl : system::error_category {
"supported by the system.";
case error::unix_sockets_ssl_unsupported:
return "The configuration specified UNIX sockets with SSL, which is not supported.";
case error::exceeds_maximum_read_buffer_size:
return "Reading data from the socket would exceed the maximum size allowed of the read "
"buffer.";
default: BOOST_ASSERT(false); return "Boost.Redis error.";
}
}
Expand Down
Loading