Skip to content

Commit f6e1660

Browse files
committed
f [skip-ci]
1 parent a79bb3c commit f6e1660

21 files changed

+389
-287
lines changed

example/cpp20_chat_room.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ using boost::asio::redirect_error;
3232
using boost::asio::use_awaitable;
3333
using boost::redis::config;
3434
using boost::redis::connection;
35-
using boost::redis::generic_flat_response;
35+
using boost::redis::generic_response;
3636
using boost::redis::ignore;
3737
using boost::redis::request;
3838
using boost::system::error_code;
@@ -46,7 +46,7 @@ auto receiver(std::shared_ptr<connection> conn) -> awaitable<void>
4646
request req;
4747
req.push("SUBSCRIBE", "channel");
4848

49-
generic_flat_response resp;
49+
generic_response resp;
5050
conn->set_receive_response(resp);
5151

5252
while (conn->will_reconnect()) {

example/cpp20_streams.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
namespace net = boost::asio;
2525
using boost::redis::config;
26-
using boost::redis::generic_flat_response;
26+
using boost::redis::generic_response;
2727
using boost::redis::operation;
2828
using boost::redis::request;
2929
using boost::redis::connection;
@@ -33,7 +33,7 @@ auto stream_reader(std::shared_ptr<connection> conn) -> net::awaitable<void>
3333
{
3434
std::string redisStreamKey_;
3535
request req;
36-
generic_flat_response resp;
36+
generic_response resp;
3737

3838
std::string stream_id{"$"};
3939
std::string const field = "myfield";
@@ -51,7 +51,7 @@ auto stream_reader(std::shared_ptr<connection> conn) -> net::awaitable<void>
5151
// The following approach was taken in order to be able to
5252
// deal with the responses, as generated by redis in the case
5353
// that there are multiple stream 'records' within a single
54-
// generic_flat_response. The nesting and number of values in
54+
// generic_response. The nesting and number of values in
5555
// resp.value() are different, depending on the contents
5656
// of the stream in redis. Uncomment the above commented-out
5757
// code for examples while running the XADD command.

example/cpp20_subscriber.cpp

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -60,24 +60,23 @@ auto receiver(std::shared_ptr<connection> conn) -> asio::awaitable<void>
6060
// Loop while reconnection is enabled
6161
while (conn->will_reconnect()) {
6262
// Reconnect to the channels.
63-
co_await conn->async_exec(req, ignore);
63+
co_await conn->async_exec(req);
6464

65-
// Loop reading Redis pushs messages.
65+
// Loop reading Redis push messages.
6666
for (error_code ec;;) {
67-
// First tries to read any buffered pushes.
68-
conn->receive(ec);
69-
if (ec == error::sync_receive_push_failed) {
70-
ec = {};
71-
co_await conn->async_receive(asio::redirect_error(asio::use_awaitable, ec));
72-
}
73-
67+
// Wait for a push
68+
co_await conn->async_receive(asio::redirect_error(ec));
7469
if (ec)
7570
break; // Connection lost, break so we can reconnect to channels.
7671

77-
std::cout << resp.value().at(1).value << " " << resp.value().at(2).value << " "
78-
<< resp.value().at(3).value << std::endl;
72+
// The response must be consumed without suspending the
73+
// coroutine i.e. without the use of async operations.
74+
for (auto const& elem: resp.value().get_view())
75+
std::cout << elem.value.data << "\n";
76+
77+
std::cout << std::endl;
7978

80-
consume_one(resp);
79+
resp.value().clear();
8180
}
8281
}
8382
}

include/boost/redis/adapter/any_adapter.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ class any_adapter {
5353
static auto create_impl(T& resp) -> impl_t
5454
{
5555
using namespace boost::redis::adapter;
56-
5756
return [adapter2 = boost_redis_adapt(resp)](
5857
any_adapter::parse_event ev,
5958
resp3::node_view const& nd,

include/boost/redis/adapter/detail/adapters.hpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,14 @@
1212
#include <boost/redis/resp3/node.hpp>
1313
#include <boost/redis/resp3/serialization.hpp>
1414
#include <boost/redis/resp3/type.hpp>
15-
#include <boost/redis/response.hpp>
15+
#include <boost/redis/generic_flat_response_value.hpp>
1616

1717
#include <boost/assert.hpp>
1818

1919
#include <array>
2020
#include <charconv>
2121
#include <deque>
2222
#include <forward_list>
23-
#include <iostream>
2423
#include <list>
2524
#include <map>
2625
#include <optional>
@@ -138,6 +137,8 @@ void boost_redis_from_bulk(T& t, resp3::basic_node<String> const& node, system::
138137
from_bulk_impl<T>::apply(t, node, ec);
139138
}
140139

140+
//================================================
141+
141142
template <class Result>
142143
class general_aggregate {
143144
private:
@@ -177,20 +178,20 @@ class general_aggregate {
177178
};
178179

179180
template <>
180-
class general_aggregate<result<flat_response_value>> {
181+
class general_aggregate<result<generic_flat_response_value>> {
181182
private:
182-
result<flat_response_value>* result_;
183+
result<generic_flat_response_value>* result_ = nullptr;
183184

184185
public:
185-
explicit general_aggregate(result<flat_response_value>* c = nullptr)
186+
explicit general_aggregate(result<generic_flat_response_value>* c = nullptr)
186187
: result_(c)
187188
{ }
188189

189190
void on_init() { }
190191
void on_done()
191192
{
192193
if (result_->has_value()) {
193-
result_->value().set_view();
194+
result_->value().notify_done();
194195
}
195196
}
196197

@@ -206,7 +207,7 @@ class general_aggregate<result<flat_response_value>> {
206207
std::string{std::cbegin(nd.value), std::cend(nd.value)}
207208
};
208209
break;
209-
default: result_->value().add_node(nd);
210+
default: result_->value().push(nd);
210211
}
211212
}
212213
};

include/boost/redis/adapter/detail/result_traits.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ struct result_traits<result<std::vector<resp3::basic_node<String>, Allocator>>>
6666
template <>
6767
struct result_traits<generic_flat_response> {
6868
using response_type = generic_flat_response;
69-
using adapter_type = adapter::detail::general_aggregate<response_type>;
69+
using adapter_type = general_aggregate<response_type>;
7070
static auto adapt(response_type& v) noexcept { return adapter_type{&v}; }
7171
};
7272

include/boost/redis/connection.hpp

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -859,6 +859,31 @@ class basic_connection {
859859
return impl_->receive_channel_.async_receive(std::forward<CompletionToken>(token));
860860
}
861861

862+
template <class CompletionToken = asio::default_completion_token_t<executor_type>>
863+
auto async_receive2(CompletionToken&& token = {})
864+
{
865+
return
866+
impl_->receive_channel_.async_receive(
867+
asio::deferred(
868+
[&conn = *this](system::error_code ec, std::size_t)
869+
{
870+
if (!ec) {
871+
auto f = [](system::error_code, std::size_t) {
872+
// There is no point in checking for errors
873+
// here since async_receive just completed
874+
// without errors.
875+
};
876+
877+
// Drain the channel.
878+
while (conn.impl_->receive_channel_.try_receive(f));
879+
}
880+
881+
return asio::deferred.values(ec);
882+
}
883+
)
884+
)(std::forward<CompletionToken>(token));
885+
}
886+
862887
/** @brief Receives server pushes synchronously without blocking.
863888
*
864889
* Receives a server push synchronously by calling `try_receive` on
@@ -1264,6 +1289,13 @@ class connection {
12641289
return impl_.async_receive(std::forward<CompletionToken>(token));
12651290
}
12661291

1292+
/// @copydoc basic_connection::async_receive2
1293+
template <class CompletionToken = asio::deferred_t>
1294+
auto async_receive2(CompletionToken&& token = {})
1295+
{
1296+
return impl_.async_receive2(std::forward<CompletionToken>(token));
1297+
}
1298+
12671299
/// @copydoc basic_connection::receive
12681300
std::size_t receive(system::error_code& ec) { return impl_.receive(ec); }
12691301

include/boost/redis/detail/writer_fsm.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//
22
// Copyright (c) 2025 Marcelo Zimbres Silva ([email protected]),
3-
// Ruben Perez Hidalgo (rubenperez038 at gmail dot com)
3+
// Nikolai Vladimirov (TODO)
44
//
55
// Distributed under the Boost Software License, Version 1.0. (See accompanying
66
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
//
2+
// Copyright (c) 2025 Marcelo Zimbres Silva ([email protected]),
3+
// Nikolai Vladimirov (TODO)
4+
//
5+
// Distributed under the Boost Software License, Version 1.0. (See accompanying
6+
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7+
//
8+
9+
#ifndef BOOST_REDIS_FLAT_RESPONSE_HPP
10+
#define BOOST_REDIS_FLAT_RESPONSE_HPP
11+
12+
#include <boost/redis/adapter/result.hpp>
13+
#include <boost/redis/resp3/node.hpp>
14+
15+
#include <boost/system/error_code.hpp>
16+
17+
#include <string>
18+
#include <tuple>
19+
#include <vector>
20+
21+
namespace boost::redis {
22+
23+
// Similar to the generic_response but its data is stored flat
24+
// internally.
25+
struct generic_flat_response_value {
26+
public:
27+
/** @brief Reserve capacity
28+
*
29+
* Reserve memory for incoming data.
30+
*
31+
* @param bytes Number of bytes to reserve for data.
32+
* @param nodes Number of nodes to reserver.
33+
*/
34+
void reserve(std::size_t bytes, std::size_t nodes);
35+
36+
/** @brief Clear both the data and the node buffers
37+
*
38+
* @Note: A `boost::redis:.generic_flat_response` can contain the
39+
* response to multiple Redis commands and server pushes. Calling
40+
* this function will erase everything contianed in it. To clear
41+
* only one response/push use `boost::redis::consume_one`.
42+
*/
43+
void clear();
44+
45+
/// Returns the size of the data buffer
46+
auto data_size() const noexcept -> std::size_t
47+
{ return data_.size(); }
48+
49+
/** @brief Return the RESP3 response
50+
*
51+
* The data member in each `boost::redis::offset_string` are all
52+
* set and therefore safe to use.
53+
*/
54+
auto get_view() const -> resp3::offset_response const&
55+
{ return view_; }
56+
57+
/** @brief Return the RESP3 response
58+
*
59+
* The data member in each `boost::redis::offset_string` are all
60+
* set and therefore safe to use.
61+
*/
62+
auto get_view() -> resp3::offset_response&
63+
{ return view_; }
64+
65+
/// Push a new node to the response
66+
void push(resp3::node_view const& nd);
67+
68+
/** @brief Returns the number of times reallocations
69+
*
70+
* Each call to the push might result in a memory reallocation.
71+
* This number function returns how many reallocations were
72+
* detected and can be useful to determine how much memory to
73+
* reserve upfront.
74+
*/
75+
auto get_reallocs() const noexcept
76+
{ return reallocs_; }
77+
78+
/** @brief Notify the object that all nodes were pushed.
79+
*
80+
* This function is called automativally by the library.
81+
*/
82+
void notify_done();
83+
84+
/// Returns the number of complete RESP3 messages contained in this object.
85+
std::size_t get_total_msgs() const noexcept
86+
{ return total_msgs_; }
87+
88+
private:
89+
void add_node_impl(resp3::node_view const& nd);
90+
91+
std::string data_;
92+
resp3::offset_response view_;
93+
std::size_t pos_ = 0u;
94+
std::size_t reallocs_ = 0u;
95+
96+
// The number of messages contained in this object.
97+
std::size_t total_msgs_ = 0u;
98+
};
99+
100+
} // namespace boost::redis
101+
102+
#endif // BOOST_REDIS_FLAT_RESPONSE_HPP
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
//
2+
// Copyright (c) 2025 Marcelo Zimbres Silva ([email protected]),
3+
// Nikolai Vladimirov (TODO)
4+
//
5+
// Distributed under the Boost Software License, Version 1.0. (See accompanying
6+
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7+
//
8+
9+
#include <boost/redis/error.hpp>
10+
#include <boost/redis/generic_flat_response_value.hpp>
11+
12+
#include <boost/assert.hpp>
13+
14+
namespace boost::redis {
15+
16+
void generic_flat_response_value::reserve(std::size_t bytes, std::size_t nodes)
17+
{
18+
data_.reserve(bytes);
19+
view_.reserve(nodes);
20+
}
21+
22+
void generic_flat_response_value::clear()
23+
{
24+
pos_ = 0u;
25+
total_msgs_ = 0u;
26+
data_.clear();
27+
view_.clear();
28+
}
29+
30+
void generic_flat_response_value::notify_done()
31+
{
32+
total_msgs_ += 1;
33+
34+
for (; pos_ < view_.size(); ++pos_) {
35+
auto& v = view_.at(pos_).value;
36+
v.data = std::string_view{data_.data() + v.offset, v.size};
37+
}
38+
}
39+
40+
void generic_flat_response_value::push(resp3::node_view const& nd)
41+
{
42+
auto data_before = data_.data();
43+
add_node_impl(nd);
44+
auto data_after = data_.data();
45+
46+
if (data_after != data_before) {
47+
pos_ = 0;
48+
reallocs_ += 1;
49+
}
50+
}
51+
52+
void generic_flat_response_value::add_node_impl(resp3::node_view const& nd)
53+
{
54+
resp3::offset_node ond;
55+
ond.data_type = nd.data_type;
56+
ond.aggregate_size = nd.aggregate_size;
57+
ond.depth = nd.depth;
58+
ond.value.offset = data_.size();
59+
ond.value.size = nd.value.size();
60+
61+
// This must come after setting the offset above.
62+
data_.append(nd.value.data(), nd.value.size());
63+
64+
view_.push_back(std::move(ond));
65+
}
66+
} // namespace boost::redis

0 commit comments

Comments
 (0)