Skip to content

Commit 2b6813a

Browse files
tstennercboulay
authored andcommitted
Refactor sync tcp server; add sync_transfer_handler
1 parent cefc312 commit 2b6813a

File tree

2 files changed

+44
-25
lines changed

2 files changed

+44
-25
lines changed

src/tcp_server.cpp

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,31 @@ class client_session : public std::enable_shared_from_this<client_session> {
144144
std::condition_variable completion_cond_;
145145
};
146146

147+
class sync_transfer_handler {
148+
bool transfer_is_sync_;
149+
// sockets that should receive data in sync mode
150+
std::vector<tcp_socket_p> sync_sockets_;
151+
// io context for sync mode, app is responsible for running it
152+
asio::io_context io_ctx_;
153+
public:
154+
sync_transfer_handler(): io_ctx_(1) {
155+
156+
}
157+
158+
/// schedules a native socket handle to be added the next time a push operation is done
159+
void add_socket(const tcp_socket::native_handle_type handle, tcp_socket::protocol_type protocol) {
160+
asio::post(io_ctx_, [=](){
161+
sync_sockets_.push_back(std::make_unique<tcp_socket>(io_ctx_, protocol, handle));
162+
});
163+
}
164+
void write_all_blocking(const std::vector<asio::const_buffer> &bufs);
165+
};
166+
147167
tcp_server::tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p sendbuf,
148168
factory_p factory, int chunk_size, bool allow_v4, bool allow_v6, bool do_sync)
149169
: chunk_size_(chunk_size), info_(std::move(info)), io_(std::move(io)),
150-
factory_(std::move(factory)), send_buffer_(std::move(sendbuf)), transfer_is_sync_(do_sync) {
151-
if (transfer_is_sync_) sync_transfer_io_ctx_ = std::make_unique<asio::io_context>(1);
170+
factory_(std::move(factory)), send_buffer_(std::move(sendbuf)) {
171+
if (do_sync) sync_handler = std::make_unique<sync_transfer_handler>();
152172
// assign connection-dependent fields
153173
info_->session_id(api_config::get_instance()->session_id());
154174
info_->reset_uid();
@@ -181,6 +201,11 @@ tcp_server::tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p s
181201
throw std::runtime_error("Failed to instantiate socket acceptors for the TCP server");
182202
}
183203

204+
tcp_server::~tcp_server() noexcept
205+
{
206+
// defined here so the compiler can generate the destructor for the sync_handler
207+
}
208+
184209

185210
// === externally issued asynchronous commands ===
186211

@@ -234,7 +259,12 @@ void tcp_server::accept_next_connection(tcp_acceptor_p &acceptor) {
234259

235260
// === synchronous transfer
236261

237-
void tcp_server::write_all_blocking(std::vector<asio::const_buffer> bufs) {
262+
void tcp_server::write_all_blocking(const std::vector<asio::const_buffer> &bufs)
263+
{
264+
sync_handler->write_all_blocking(bufs);
265+
}
266+
267+
void sync_transfer_handler::write_all_blocking(const std::vector<asio::const_buffer> &bufs) {
238268
bool any_session_broken = false;
239269

240270
for (auto &sock : sync_sockets_) {
@@ -247,7 +277,7 @@ void tcp_server::write_all_blocking(std::vector<asio::const_buffer> bufs) {
247277
case asio::error::connection_reset:
248278
LOG_F(WARNING, "Broken Pipe / Connection Reset detected. Closing socket.");
249279
any_session_broken = true;
250-
asio::post(*sync_transfer_io_ctx_, [sock]() {
280+
asio::post(io_ctx_, [sock]() {
251281
asio::error_code close_ec;
252282
sock->close(close_ec);
253283
});
@@ -262,8 +292,8 @@ void tcp_server::write_all_blocking(std::vector<asio::const_buffer> bufs) {
262292
}
263293
try {
264294
// prepare the io context for new work
265-
sync_transfer_io_ctx_->restart();
266-
sync_transfer_io_ctx_->run();
295+
io_ctx_.restart();
296+
io_ctx_.run();
267297

268298
if (any_session_broken) {
269299
// remove sessions whose socket was closed
@@ -577,21 +607,14 @@ void client_session::handle_send_feedheader_outcome(err_t err, std::size_t n) {
577607
// convenient for unit tests
578608
if (max_buffered_ <= 0) return;
579609

580-
if (serv->transfer_is_sync_) {
610+
if (serv->sync_handler) {
581611
LOG_F(INFO, "Using synchronous blocking transfers for new client session.");
582-
auto &sock_io_ctx = *serv->sync_transfer_io_ctx_;
583-
612+
auto protocol = sock_.local_endpoint().protocol();
584613
// move the socket into the sync_transfer_io_ctx by releasing it from this
585614
// io ctx and re-creating it with sync_transfer_io_ctx.
586615
// See https://stackoverflow.com/q/52671836/73299
587616
// Then schedule the sync_transfer_io_ctx to add it to the list of sync sockets
588-
auto protocol = sock_.local_endpoint().protocol();
589-
auto new_sock = std::make_shared<tcp_socket>(sock_io_ctx, protocol, sock_.release());
590-
591-
asio::post(sock_io_ctx, [serv, sock_p = std::move(new_sock)]() {
592-
LOG_F(1, "Moved socket to new io_ctx");
593-
serv->sync_sockets_.emplace_back(std::move(sock_p));
594-
});
617+
serv->sync_handler->add_socket(sock_.release(), protocol);
595618
serv->unregister_inflight_session(this);
596619
return;
597620
}

src/tcp_server.h

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ class tcp_server : public std::enable_shared_from_this<tcp_server> {
5353
tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p sendbuf, factory_p factory,
5454
int chunk_size, bool allow_v4, bool allow_v6, bool do_sync = false);
5555

56+
~tcp_server() noexcept;
57+
5658
/**
5759
* Begin serving TCP connections.
5860
*
@@ -73,7 +75,7 @@ class tcp_server : public std::enable_shared_from_this<tcp_server> {
7375
* Write directly to each socket. This should only be used when server initialized with
7476
* do_sync = true.
7577
*/
76-
void write_all_blocking(std::vector<asio::const_buffer> bufs);
78+
void write_all_blocking(const std::vector<asio::const_buffer>& bufs);
7779

7880
private:
7981
friend class client_session;
@@ -104,14 +106,8 @@ class tcp_server : public std::enable_shared_from_this<tcp_server> {
104106
tcp_acceptor_p acceptor_v4_, acceptor_v6_; // our server socket
105107

106108

107-
// sync mode fields
108-
109-
// Flag to indicate that new client_sessions should use synchronous blocking data transfer.
110-
bool transfer_is_sync_;
111-
// sockets that should receive data in sync mode
112-
std::vector<tcp_socket_p> sync_sockets_;
113-
// io context for sync mode, app is responsible for running it
114-
std::unique_ptr<asio::io_context> sync_transfer_io_ctx_;
109+
// optional pointer to a handler class for synchronous transfers
110+
std::unique_ptr<class sync_transfer_handler> sync_handler;
115111

116112
// registry of in-flight asessions (for cancellation)
117113
std::map<void *, std::weak_ptr<client_session>> inflight_;

0 commit comments

Comments
 (0)