Skip to content

Commit c64aeb9

Browse files
committed
Implement synchronous outlet for zero-copy writes.
1 parent d63f5ce commit c64aeb9

File tree

8 files changed

+148
-38
lines changed

8 files changed

+148
-38
lines changed

examples/SendDataInChunks.cpp

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,24 +91,25 @@ struct fake_device {
9191

9292
int main(int argc, char **argv) {
9393
std::cout << "SendDataInChunks" << std::endl;
94-
std::cout << "SendDataInChunks StreamName StreamType samplerate n_channels max_buffered chunk_rate nodata" << std::endl;
94+
std::cout << "SendDataInChunks StreamName StreamType samplerate n_channels max_buffered chunk_rate nodata do_async" << std::endl;
9595
std::cout << "- max_buffered -- duration in sec (or x100 samples if samplerate is 0) to buffer for each outlet" << std::endl;
9696
std::cout << "- chunk_rate -- number of chunks pushed per second. For this example, make it a common factor of samplingrate and 1000." << std::endl;
9797
std::cout << "- nodata -- Set non-zero to cause the fake device to not copy pattern data into the buffer." << std::endl;
98+
std::cout << "- do_async -- Set to zero to use blocking send." << std::endl;
9899

99100
std::string name{argc > 1 ? argv[1] : "MyAudioStream"}, type{argc > 2 ? argv[2] : "Audio"};
100101
int samplingrate = argc > 3 ? std::stol(argv[3]) : 44100; // Here we specify srate, but typically this would come from the device.
101102
int n_channels = argc > 4 ? std::stol(argv[4]) : 2; // Here we specify n_chans, but typically this would come from theh device.
102103
double max_buffered = argc > 5 ? std::stod(argv[5]) : 360.;
103104
int32_t chunk_rate = argc > 6 ? std::stol(argv[6]) : 10; // Chunks per second.
104105
bool nodata = argc > 7;
106+
bool do_async = argc > 8 ? (bool)std::stol(argv[8]) : true;
105107
int32_t chunk_samples = samplingrate > 0 ? std::max((samplingrate / chunk_rate), 1) : 100; // Samples per chunk.
106108
int32_t chunk_duration = 1000 / chunk_rate; // Milliseconds per chunk
107109

108110
try {
109111
// Prepare the LSL stream.
110-
lsl::stream_info info(name, type, n_channels, samplingrate, lsl::cf_int16);
111-
lsl::stream_outlet outlet(info, chunk_samples, max_buffered);
112+
lsl::stream_info info(name, type, n_channels, samplingrate, lsl::cf_int16, "example-SendDataInChunks");
112113
lsl::xml_element desc = info.desc();
113114
desc.append_child_value("manufacturer", "LSL");
114115
lsl::xml_element chns = desc.append_child("channels");
@@ -118,6 +119,9 @@ int main(int argc, char **argv) {
118119
chn.append_child_value("unit", "microvolts");
119120
chn.append_child_value("type", "EEG");
120121
}
122+
lsl::stream_outlet outlet(info, chunk_samples, max_buffered, do_async);
123+
info = outlet.info(); // Refresh info with whatever the outlet captured.
124+
std::cout << "Stream UID: " << info.uid() << std::endl;
121125

122126
// Create a connection to our device.
123127
fake_device my_device(n_channels, (float)samplingrate);
@@ -142,7 +146,8 @@ int main(int argc, char **argv) {
142146

143147
// send it to the outlet. push_chunk_multiplexed is one of the more complicated approaches.
144148
// other push_chunk methods are easier but slightly slower.
145-
outlet.push_chunk_multiplexed(chunk_buffer.data(), returned_samples * n_channels, 0.0, false);
149+
double ts = lsl::local_clock();
150+
outlet.push_chunk_multiplexed(chunk_buffer.data(), returned_samples * n_channels, ts, true);
146151
}
147152

148153
} catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; }

include/lsl/outlet.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,14 @@
3131
* nominal sampling rate, otherwise x100 in samples). A good default is 360, which corresponds to 6
3232
* minutes of data. Note that, for high-bandwidth data you will almost certainly want to use a lower
3333
* value here to avoid running out of RAM.
34+
* @param do_async Set to 1 to preserve previous behaviour of using asynchronous operations,
35+
* or set to 0 to use new synchronous methods which block but are overall faster due to fewer
36+
* copies.
3437
* @return A newly created lsl_outlet handle or NULL in the event that an error occurred.
3538
*/
3639
extern LIBLSL_C_API lsl_outlet lsl_create_outlet(lsl_streaminfo info, int32_t chunk_size, int32_t max_buffered);
3740
extern LIBLSL_C_API lsl_outlet lsl_create_outlet_d(
38-
lsl_streaminfo info, int32_t chunk_size, double max_buffered);
41+
lsl_streaminfo info, int32_t chunk_size, double max_buffered, int do_async);
3942

4043
/**
4144
* Destroy an outlet.

include/lsl_cpp.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -400,9 +400,9 @@ class stream_outlet {
400400
: channel_count(info.channel_count()),
401401
sample_rate(info.nominal_srate()),
402402
obj(lsl_create_outlet(info.handle().get(), chunk_size, max_buffered), &lsl_destroy_outlet) {}
403-
stream_outlet(const stream_info &info, int32_t chunk_size, double max_buffered)
403+
stream_outlet(const stream_info &info, int32_t chunk_size, double max_buffered, bool do_async = true)
404404
: channel_count(info.channel_count()), sample_rate(info.nominal_srate()),
405-
obj(lsl_create_outlet_d(info.handle().get(), chunk_size, max_buffered),
405+
obj(lsl_create_outlet_d(info.handle().get(), chunk_size, max_buffered, do_async),
406406
&lsl_destroy_outlet) {}
407407

408408
// ========================================

src/lsl_outlet_c.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,17 @@ using namespace lsl;
1616

1717
// boilerplate wrapper code
1818
LIBLSL_C_API lsl_outlet lsl_create_outlet_d(
19-
lsl_streaminfo info, int32_t chunk_size, double max_buffered) {
19+
lsl_streaminfo info, int32_t chunk_size, double max_buffered, int do_async) {
2020
double buftime = info->nominal_srate();
2121
if (buftime <= 0) buftime = 100;
2222
if ((buftime * max_buffered) < 1) max_buffered = 1.5 / buftime;
2323
return create_object_noexcept<stream_outlet_impl>(
24-
*info, chunk_size, static_cast<int>(buftime * max_buffered));
24+
*info, chunk_size, static_cast<int>(buftime * max_buffered), (bool)do_async);
2525
}
2626

2727
LIBLSL_C_API lsl_outlet lsl_create_outlet(
2828
lsl_streaminfo info, int32_t chunk_size, int32_t max_buffered) {
29-
return lsl_create_outlet_d(info, chunk_size, (double)max_buffered);
29+
return lsl_create_outlet_d(info, chunk_size, (double)max_buffered, 1);
3030
}
3131

3232
LIBLSL_C_API void lsl_destroy_outlet(lsl_outlet out) {

src/stream_outlet_impl.cpp

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,18 @@
1212
namespace lsl {
1313

1414
stream_outlet_impl::stream_outlet_impl(
15-
const stream_info_impl &info, int32_t chunk_size, int32_t max_capacity)
15+
const stream_info_impl &info, int32_t chunk_size, int32_t max_capacity, bool do_async)
1616
: sample_factory_(std::make_shared<factory>(info.channel_format(), info.channel_count(),
1717
static_cast<uint32_t>(
1818
info.nominal_srate()
1919
? info.nominal_srate() * api_config::get_instance()->outlet_buffer_reserve_ms() /
2020
1000
2121
: api_config::get_instance()->outlet_buffer_reserve_samples()))),
2222
chunk_size_(chunk_size), info_(std::make_shared<stream_info_impl>(info)),
23-
send_buffer_(std::make_shared<send_buffer>(max_capacity)) {
23+
send_buffer_(std::make_shared<send_buffer>(max_capacity)),
24+
do_async_(do_async) {
25+
// TODO: assert do_async_ or info.channel_format() != cft_string,
26+
// no-copy blocking write not supported for variable-length strings.
2427
ensure_lsl_initialized();
2528
const api_config *cfg = api_config::get_instance();
2629

@@ -71,7 +74,7 @@ void stream_outlet_impl::instantiate_stack(tcp tcp_protocol, udp udp_protocol) {
7174
// create TCP data server
7275
ios_.push_back(std::make_shared<asio::io_context>());
7376
tcp_servers_.push_back(std::make_shared<tcp_server>(
74-
info_, ios_.back(), send_buffer_, sample_factory_, tcp_protocol, chunk_size_));
77+
info_, ios_.back(), send_buffer_, sample_factory_, tcp_protocol, chunk_size_, do_async_));
7578
// create UDP time server
7679
ios_.push_back(std::make_shared<asio::io_context>());
7780
udp_servers_.push_back(std::make_shared<udp_server>(info_, *ios_.back(), udp_protocol));
@@ -143,8 +146,23 @@ void stream_outlet_impl::push_numeric_raw(const void *data, double timestamp, bo
143146
if (lsl::api_config::get_instance()->force_default_timestamps()) timestamp = 0.0;
144147
sample_p smp(
145148
sample_factory_->new_sample(timestamp == 0.0 ? lsl_clock() : timestamp, pushthrough));
146-
smp->assign_untyped(data);
147-
send_buffer_->push_sample(smp);
149+
if (do_async_) {
150+
smp->assign_untyped(data);
151+
send_buffer_->push_sample(smp);
152+
} else {
153+
// TODO: Move the following into a common method
154+
if (timestamp == DEDUCED_TIMESTAMP) {
155+
sync_buffs_.push_back(lslboost::asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1));
156+
} else {
157+
sync_buffs_.push_back(lslboost::asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1));
158+
sync_buffs_.push_back(lslboost::asio::buffer(&timestamp, sizeof(timestamp)));
159+
}
160+
sync_buffs_.push_back(lslboost::asio::buffer(data, smp->datasize()));
161+
if (pushthrough) {
162+
for (auto &tcp_server : tcp_servers_)
163+
tcp_server->write_all_blocking(sync_buffs_);
164+
}
165+
}
148166
}
149167

150168
bool stream_outlet_impl::have_consumers() { return send_buffer_->have_consumers(); }
@@ -158,8 +176,24 @@ void stream_outlet_impl::enqueue(const T *data, double timestamp, bool pushthrou
158176
if (lsl::api_config::get_instance()->force_default_timestamps()) timestamp = 0.0;
159177
sample_p smp(
160178
sample_factory_->new_sample(timestamp == 0.0 ? lsl_clock() : timestamp, pushthrough));
161-
smp->assign_typed(data);
162-
send_buffer_->push_sample(smp);
179+
if (do_async_) {
180+
smp->assign_typed(data);
181+
send_buffer_->push_sample(smp);
182+
} else {
183+
// TODO: Move the following into a common method
184+
if (timestamp == DEDUCED_TIMESTAMP) {
185+
sync_buffs_.push_back(lslboost::asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1));
186+
} else {
187+
sync_buffs_.push_back(lslboost::asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1));
188+
sync_buffs_.push_back(lslboost::asio::buffer(&timestamp, sizeof(timestamp)));
189+
}
190+
sync_buffs_.push_back(lslboost::asio::buffer(data, smp->datasize()));
191+
if (pushthrough) {
192+
for (auto &tcp_server : tcp_servers_)
193+
tcp_server->write_all_blocking(sync_buffs_);
194+
sync_buffs_.clear();
195+
}
196+
}
163197
}
164198

165199
template void stream_outlet_impl::enqueue<char>(const char *data, double, bool);

src/stream_outlet_impl.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <string>
1111
#include <thread>
1212
#include <vector>
13+
#include <boost/asio/buffer.hpp>
1314

1415
using asio::ip::tcp;
1516
using asio::ip::udp;
@@ -35,9 +36,13 @@ class stream_outlet_impl {
3536
* @param max_capacity The maximum number of samples buffered for unresponsive receivers. If
3637
* more samples get pushed, the oldest will be dropped. The default is sufficient to hold a bit
3738
* more than 15 minutes of data at 512Hz, while consuming not more than ca. 512MB of RAM.
39+
* @param do_async If true (default) then send operations will happen in a secondary thread.
40+
* Set false to force synchronous-only transfers. This will slow down calls to `push_*` but
41+
* makes fewer data copies and can be overall much faster for high bandwidth data. This feature
42+
* is new and relatively untested; use at your own risk.
3843
*/
3944
stream_outlet_impl(
40-
const stream_info_impl &info, int32_t chunk_size = 0, int32_t max_capacity = 512000);
45+
const stream_info_impl &info, int32_t chunk_size = 0, int32_t max_capacity = 512000, bool do_async = true);
4146

4247
/**
4348
* Destructor.
@@ -317,6 +322,8 @@ class stream_outlet_impl {
317322
stream_info_impl_p info_;
318323
/// the single-producer, multiple-receiver send buffer
319324
send_buffer_p send_buffer_;
325+
/// Flag to indicate that push_* operations should be asynchronous. true by default.
326+
bool do_async_;
320327
/// the IO service objects (two per stack: one for UDP and one for TCP)
321328
std::vector<io_context_p> ios_;
322329

@@ -329,6 +336,8 @@ class stream_outlet_impl {
329336
std::vector<udp_server_p> responders_;
330337
/// threads that handle the I/O operations (two per stack: one for UDP and one for TCP)
331338
std::vector<thread_p> io_threads_;
339+
/// buffers used in synchronous call to gather-write data directly to the socket.
340+
std::vector<lslboost::asio::const_buffer> sync_buffs_;
332341
};
333342

334343
} // namespace lsl

src/tcp_server.cpp

Lines changed: 63 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -148,10 +148,11 @@ class client_session : public std::enable_shared_from_this<client_session> {
148148
};
149149

150150
tcp_server::tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p sendbuf,
151-
factory_p factory, tcp protocol, int chunk_size)
151+
factory_p factory, tcp protocol, int chunk_size, bool do_async)
152152
: chunk_size_(chunk_size), shutdown_(false), info_(std::move(info)), io_(std::move(io)),
153153
factory_(std::move(factory)), send_buffer_(std::move(sendbuf)),
154-
acceptor_(std::make_shared<tcp::acceptor>(*io_)) {
154+
acceptor_(std::make_shared<tcp::acceptor>(*io_)),
155+
transfer_is_async_(do_async) {
155156
// open the server connection
156157
acceptor_->open(protocol);
157158

@@ -222,34 +223,69 @@ void tcp_server::handle_accept_outcome(std::shared_ptr<client_session> newsessio
222223
accept_next_connection();
223224
}
224225

226+
// === synchronous transfer
227+
228+
void tcp_server::write_all_blocking(std::vector<lslboost::asio::const_buffer> buffs) {
229+
std::lock_guard<std::recursive_mutex> lock(inflight_mut_);
230+
std::size_t bytes_sent;
231+
lslboost::system::error_code ec;
232+
for (const auto &x : inflight_ready_) {
233+
if (x.second && x.first->is_open()) {
234+
bytes_sent = x.first->send(buffs, 0, ec);
235+
if (ec) {
236+
switch(ec.value()) {
237+
case lslboost::system::errc::broken_pipe:
238+
case lslboost::system::errc::connection_reset:
239+
LOG_F(WARNING, "Broken Pipe / Connection Reset detected. Closing socket.");
240+
inflight_ready_[x.first] = false;
241+
post(*io_, [x]() {
242+
close_inflight_socket(x);
243+
});
244+
// We leave it up to the client_session destructor to remove the socket.
245+
break;
246+
default:
247+
LOG_F(WARNING, "Unhandled write_all_blocking error: %s.", ec.message().c_str());
248+
}
249+
}
250+
}
251+
}
252+
}
253+
225254
// === graceful cancellation of in-flight sockets ===
226255

227256
void tcp_server::register_inflight_socket(const tcp_socket_p &sock) {
228257
std::lock_guard<std::recursive_mutex> lock(inflight_mut_);
229-
inflight_.insert(sock);
258+
inflight_ready_.insert({sock, false});
230259
}
231260

232261
void tcp_server::unregister_inflight_socket(const tcp_socket_p &sock) {
233262
std::lock_guard<std::recursive_mutex> lock(inflight_mut_);
234-
inflight_.erase(sock);
263+
inflight_ready_[sock] = false;
264+
inflight_ready_.erase(sock);
265+
}
266+
267+
void tcp_server::close_inflight_socket(std::pair<tcp_socket_p, bool> x) {
268+
try {
269+
if (x.first->is_open()) {
270+
try {
271+
// (in some cases shutdown may fail)
272+
x.first->shutdown(x.first->shutdown_both);
273+
} catch (...) {}
274+
x.first->close();
275+
}
276+
} catch (std::exception &e) {
277+
LOG_F(WARNING, "Error during shutdown_and_close: %s", e.what());
278+
}
235279
}
236280

237281
void tcp_server::close_inflight_sockets() {
238282
std::lock_guard<std::recursive_mutex> lock(inflight_mut_);
239-
for (const auto &sock : inflight_)
240-
post(*io_, [sock]() {
241-
try {
242-
if (sock->is_open()) {
243-
try {
244-
// (in some cases shutdown may fail)
245-
sock->shutdown(sock->shutdown_both);
246-
} catch (...) {}
247-
sock->close();
248-
}
249-
} catch (std::exception &e) {
250-
LOG_F(WARNING, "Error during shutdown_and_close: %s", e.what());
251-
}
283+
for (const auto &x : inflight_ready_) {
284+
inflight_ready_[x.first] = false;
285+
post(*io_, [x]() {
286+
close_inflight_socket(x);
252287
});
288+
}
253289
}
254290

255291

@@ -509,8 +545,17 @@ void client_session::handle_send_feedheader_outcome(err_t err, std::size_t n) {
509545
feedbuf_.consume(n);
510546
// register outstanding work at the server (will be unregistered at session destruction)
511547
work_ = std::make_shared<work_p::element_type>(serv_->io_->get_executor());
512-
// spawn a sample transfer thread
548+
serv_->inflight_ready_[sock_] = true;
513549
std::thread(&client_session::transfer_samples_thread, this, shared_from_this()).detach();
550+
551+
/*
552+
if (serv_->transfer_is_async_)
553+
// spawn a sample transfer thread
554+
std::thread(&client_session::transfer_samples_thread, this, shared_from_this()).detach();
555+
else
556+
LOG_F(WARNING, "Using synchronous-only transfer for new client session!");
557+
*/
558+
514559
} catch (std::exception &e) {
515560
LOG_F(WARNING, "Unexpected error while handling the feedheader send outcome: %s", e.what());
516561
}

src/tcp_server.h

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,11 @@ class tcp_server : public std::enable_shared_from_this<tcp_server> {
4747
* @param factory A sample factory that is shared with other server objects.
4848
* @param protocol The protocol (IPv4 or IPv6) that shall be serviced by this server.
4949
* @param chunk_size The preferred chunk size, in samples. If 0, the pushthrough flag determines
50+
* @param do_async Flag to indicate the data transfer should happen asynchronously in a thread.
5051
* the effective chunking.
5152
*/
5253
tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p sendbuf, factory_p factory,
53-
tcp protocol, int chunk_size);
54+
tcp protocol, int chunk_size, bool do_async = true);
5455

5556
/**
5657
* Begin serving TCP connections.
@@ -68,6 +69,12 @@ class tcp_server : public std::enable_shared_from_this<tcp_server> {
6869
*/
6970
void end_serving();
7071

72+
/**
73+
* Write directly to each socket. This should only be used when server initialized with
74+
* do_async = false.
75+
*/
76+
void write_all_blocking(std::vector<lslboost::asio::const_buffer> buffs);
77+
7178
private:
7279
friend class client_session;
7380
/// Start accepting a new connection.
@@ -83,6 +90,9 @@ class tcp_server : public std::enable_shared_from_this<tcp_server> {
8390
/// Unregister an in-flight session socket.
8491
void unregister_inflight_socket(const tcp_socket_p &sock);
8592

93+
/// Post a close of a single in-flight socket
94+
static void close_inflight_socket(std::pair<tcp_socket_p, bool> x);
95+
8696
/// Post a close of all in-flight sockets.
8797
void close_inflight_sockets();
8898

@@ -101,8 +111,12 @@ class tcp_server : public std::enable_shared_from_this<tcp_server> {
101111
// acceptor socket
102112
tcp_acceptor_p acceptor_; // our server socket
103113

114+
// Flag to indicate that new client_sessions should setup an async data transfer thread.
115+
bool transfer_is_async_;
116+
104117
// registry of in-flight client sockets (for cancellation)
105-
std::set<tcp_socket_p> inflight_; // registry of currently in-flight sockets
118+
// std::set<tcp_socket_p> inflight_; // registry of currently in-flight sockets
119+
std::map<tcp_socket_p, bool> inflight_ready_;
106120
std::recursive_mutex inflight_mut_; // mutex protecting the registry from concurrent access
107121

108122
// some cached data

0 commit comments

Comments
 (0)