From 89252cd7a1bfdbfe149544e4eec7e837d655fa09 Mon Sep 17 00:00:00 2001 From: Chadwick Boulay Date: Fri, 8 Oct 2021 10:52:34 -0400 Subject: [PATCH 1/5] Implement synchronous outlet for zero-copy writes. --- examples/ReceiveDataInChunks.cpp | 8 +++- examples/SendDataInChunks.cpp | 35 ++++++++++----- include/lsl/common.h | 3 ++ src/stream_outlet_impl.cpp | 50 ++++++++++++++++++--- src/stream_outlet_impl.h | 9 +++- src/tcp_server.cpp | 76 +++++++++++++++++++++++--------- src/tcp_server.h | 22 +++++++-- 7 files changed, 159 insertions(+), 44 deletions(-) diff --git a/examples/ReceiveDataInChunks.cpp b/examples/ReceiveDataInChunks.cpp index ec2f91e2..5b5c4d5f 100644 --- a/examples/ReceiveDataInChunks.cpp +++ b/examples/ReceiveDataInChunks.cpp @@ -8,14 +8,18 @@ int main(int argc, char **argv) { std::cout << "ReceiveDataInChunks" << std::endl; std::cout << "ReceiveDataInChunks StreamName max_buflen flush" << std::endl; + std::cout << "- max_buffered -- duration in msec to buffer" << std::endl; + std::cout << "- flush -- set non-zero to flush data instead of pulling; useful for testing throughput" << std::endl; try { std::string name{argc > 1 ? argv[1] : "MyAudioStream"}; - int32_t max_buflen = argc > 2 ? std::stol(argv[2]) : 360; + double max_buflen = argc > 2 ? std::stod(argv[2]) : 360.; bool flush = argc > 3; // resolve the stream of interest & make an inlet - lsl::stream_inlet inlet(lsl::resolve_stream("name", name).at(0), max_buflen); + int32_t buf_samples = (int32_t)(max_buflen * 1000); + lsl::stream_inlet inlet(lsl::resolve_stream("name", name).at(0), max_buflen, + transp_bufsize_thousandths); // Use set_postprocessing to get the timestamps in a common base clock. // Do not use if this application will record timestamps to disk -- it is better to diff --git a/examples/SendDataInChunks.cpp b/examples/SendDataInChunks.cpp index d051975c..846fe676 100644 --- a/examples/SendDataInChunks.cpp +++ b/examples/SendDataInChunks.cpp @@ -44,9 +44,11 @@ struct fake_device { pattern.reserve(pattern_samples * n_channels); for (auto sample_ix = 0; sample_ix < pattern_samples; ++sample_ix) { for (auto chan_ix = 0; chan_ix < n_channels; ++chan_ix) { + // sin(2*pi*f*t), where f cycles from 1 Hz to Nyquist: srate / 2 + double f = (chan_ix + 1) % (int)(srate / 2); pattern.emplace_back( offset_0 + chan_ix * offset_step + - magnitude * static_cast(sin(M_PI * chan_ix * sample_ix / n_channels))); + magnitude * static_cast(sin(2 * M_PI * f * sample_ix / srate))); } } last_time = std::chrono::steady_clock::now(); @@ -64,15 +66,15 @@ struct fake_device { return output; } - std::size_t get_data(std::vector &buffer) { + std::size_t get_data(std::vector &buffer, bool nodata = false) { auto now = std::chrono::steady_clock::now(); auto elapsed_nano = std::chrono::duration_cast(now - last_time).count(); int64_t elapsed_samples = std::size_t(elapsed_nano * srate * 1e-9); // truncate OK. elapsed_samples = std::min(elapsed_samples, (int64_t)(buffer.size() / n_channels)); - if (false) { + if (nodata) { // The fastest but no patterns. - memset(&buffer[0], 23, buffer.size() * sizeof buffer[0]); + // memset(&buffer[0], 23, buffer.size() * sizeof buffer[0]); } else { std::size_t end_sample = head + elapsed_samples; std::size_t nowrap_samples = std::min(pattern_samples - head, elapsed_samples); @@ -89,22 +91,26 @@ struct fake_device { int main(int argc, char **argv) { std::cout << "SendDataInChunks" << std::endl; - std::cout << "SendDataInChunks StreamName StreamType samplerate n_channels max_buffered chunk_rate" << std::endl; + std::cout << "SendDataInChunks StreamName StreamType samplerate n_channels max_buffered chunk_rate nodata use_sync" << std::endl; std::cout << "- max_buffered -- duration in sec (or x100 samples if samplerate is 0) to buffer for each outlet" << std::endl; std::cout << "- chunk_rate -- number of chunks pushed per second. For this example, make it a common factor of samplingrate and 1000." << std::endl; - + std::cout << "- nodata -- Set non-zero to cause the fake device to not copy pattern data into the buffer." << std::endl; + std::cout << "- use_sync -- Set to non-zero to use blocking send." << std::endl; + std::string name{argc > 1 ? argv[1] : "MyAudioStream"}, type{argc > 2 ? argv[2] : "Audio"}; int samplingrate = argc > 3 ? std::stol(argv[3]) : 44100; // Here we specify srate, but typically this would come from the device. int n_channels = argc > 4 ? std::stol(argv[4]) : 2; // Here we specify n_chans, but typically this would come from theh device. - int32_t max_buffered = argc > 5 ? std::stol(argv[5]) : 360; + double max_buffered = argc > 5 ? std::stod(argv[5]) : 360.; int32_t chunk_rate = argc > 6 ? std::stol(argv[6]) : 10; // Chunks per second. + bool nodata = argc > 7; + bool do_sync = argc > 8 ? (bool)std::stol(argv[8]) : true; + int32_t chunk_samples = samplingrate > 0 ? std::max((samplingrate / chunk_rate), 1) : 100; // Samples per chunk. int32_t chunk_duration = 1000 / chunk_rate; // Milliseconds per chunk try { // Prepare the LSL stream. - lsl::stream_info info(name, type, n_channels, samplingrate, lsl::cf_int16); - lsl::stream_outlet outlet(info, 0, max_buffered); + lsl::stream_info info(name, type, n_channels, samplingrate, lsl::cf_int16, "example-SendDataInChunks"); lsl::xml_element desc = info.desc(); desc.append_child_value("manufacturer", "LSL"); lsl::xml_element chns = desc.append_child("channels"); @@ -114,6 +120,11 @@ int main(int argc, char **argv) { chn.append_child_value("unit", "microvolts"); chn.append_child_value("type", "EEG"); } + int32_t buf_samples = max_buffered * samplingrate; + lsl::stream_outlet outlet(info, chunk_samples, buf_samples, + transp_bufsize_samples | (do_sync ? transp_sync_blocking: transp_default)); + info = outlet.info(); // Refresh info with whatever the outlet captured. + std::cout << "Stream UID: " << info.uid() << std::endl; // Create a connection to our device. fake_device my_device(n_channels, (float)samplingrate); @@ -121,6 +132,7 @@ int main(int argc, char **argv) { // Prepare buffer to get data from 'device'. // The buffer should be larger than you think you need. Here we make it 4x as large. std::vector chunk_buffer(4 * chunk_samples * n_channels); + std::fill(chunk_buffer.begin(), chunk_buffer.end(), 0); std::cout << "Now sending data..." << std::endl; @@ -133,11 +145,12 @@ int main(int argc, char **argv) { std::this_thread::sleep_until(next_chunk_time); // Get data from device - std::size_t returned_samples = my_device.get_data(chunk_buffer); + std::size_t returned_samples = my_device.get_data(chunk_buffer, nodata); // send it to the outlet. push_chunk_multiplexed is one of the more complicated approaches. // other push_chunk methods are easier but slightly slower. - outlet.push_chunk_multiplexed(chunk_buffer.data(), returned_samples * n_channels, 0.0, true); + double ts = lsl::local_clock(); + outlet.push_chunk_multiplexed(chunk_buffer.data(), returned_samples * n_channels, ts, true); } } catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; } diff --git a/include/lsl/common.h b/include/lsl/common.h index 3c81c798..153df274 100644 --- a/include/lsl/common.h +++ b/include/lsl/common.h @@ -161,6 +161,9 @@ typedef enum { /// The supplied max_buf should be scaled by 0.001. transp_bufsize_thousandths = 2, + /// The outlet will use synchronous (blocking) calls to asio to push data + transp_sync_blocking = 4, + // prevent compilers from assuming an instance fits in a single byte _lsl_transport_options_maxval = 0x7f000000 } lsl_transport_options_t; diff --git a/src/stream_outlet_impl.cpp b/src/stream_outlet_impl.cpp index 35ab88e0..b1986ae9 100644 --- a/src/stream_outlet_impl.cpp +++ b/src/stream_outlet_impl.cpp @@ -12,7 +12,7 @@ namespace lsl { stream_outlet_impl::stream_outlet_impl( - const stream_info_impl &info, int32_t chunk_size, int32_t max_capacity) + const stream_info_impl &info, int32_t chunk_size, int32_t max_capacity, uint32_t flags) : sample_factory_(std::make_shared(info.channel_format(), info.channel_count(), static_cast( info.nominal_srate() @@ -20,7 +20,14 @@ stream_outlet_impl::stream_outlet_impl( 1000 : api_config::get_instance()->outlet_buffer_reserve_samples()))), chunk_size_(chunk_size), info_(std::make_shared(info)), - send_buffer_(std::make_shared(max_capacity)) { + send_buffer_(std::make_shared(max_capacity)), + do_sync_(flags & transp_sync_blocking) { + + if ((info.channel_format() == cft_string) && (flags & transp_sync_blocking)) { + LOG_F(WARNING, "sync push not supported for string-formatted streams. Reverting to async."); + do_sync_ = false; + } + ensure_lsl_initialized(); const api_config *cfg = api_config::get_instance(); @@ -143,8 +150,24 @@ void stream_outlet_impl::push_numeric_raw(const void *data, double timestamp, bo if (lsl::api_config::get_instance()->force_default_timestamps()) timestamp = 0.0; sample_p smp( sample_factory_->new_sample(timestamp == 0.0 ? lsl_clock() : timestamp, pushthrough)); - smp->assign_untyped(data); - send_buffer_->push_sample(smp); + if (!do_sync_) { + smp->assign_untyped(data); // Note: Makes a copy! + send_buffer_->push_sample(smp); + } else { + if (timestamp == DEDUCED_TIMESTAMP) { + sync_buffs_.push_back(asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1)); + } else { + sync_buffs_.push_back(asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1)); + sync_buffs_.push_back(asio::buffer(×tamp, sizeof(timestamp))); + } + sync_buffs_.push_back(asio::buffer(data, smp->datasize())); + if (pushthrough) { + for (auto &tcp_server : tcp_servers_) + tcp_server->write_all_blocking(sync_buffs_); + sync_buffs_.clear(); + } + } + } bool stream_outlet_impl::have_consumers() { return send_buffer_->have_consumers(); } @@ -158,8 +181,23 @@ void stream_outlet_impl::enqueue(const T *data, double timestamp, bool pushthrou if (lsl::api_config::get_instance()->force_default_timestamps()) timestamp = 0.0; sample_p smp( sample_factory_->new_sample(timestamp == 0.0 ? lsl_clock() : timestamp, pushthrough)); - smp->assign_typed(data); - send_buffer_->push_sample(smp); + if (!do_sync_) { + smp->assign_typed(data); + send_buffer_->push_sample(smp); + } else { + if (timestamp == DEDUCED_TIMESTAMP) { + sync_buffs_.push_back(asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1)); + } else { + sync_buffs_.push_back(asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1)); + sync_buffs_.push_back(asio::buffer(×tamp, sizeof(timestamp))); + } + sync_buffs_.push_back(asio::buffer(data, smp->datasize())); + if (pushthrough) { + for (auto &tcp_server : tcp_servers_) + tcp_server->write_all_blocking(sync_buffs_); + sync_buffs_.clear(); + } + } } template void stream_outlet_impl::enqueue(const char *data, double, bool); diff --git a/src/stream_outlet_impl.h b/src/stream_outlet_impl.h index 9ec3a75f..32b76b8a 100644 --- a/src/stream_outlet_impl.h +++ b/src/stream_outlet_impl.h @@ -10,6 +10,7 @@ #include #include #include +#include using asio::ip::tcp; using asio::ip::udp; @@ -35,9 +36,11 @@ class stream_outlet_impl { * @param max_capacity The maximum number of samples buffered for unresponsive receivers. If * more samples get pushed, the oldest will be dropped. The default is sufficient to hold a bit * more than 15 minutes of data at 512Hz, while consuming not more than ca. 512MB of RAM. + * @param flags Bitwise-OR'd flags from lsl_transport_options_t */ stream_outlet_impl( - const stream_info_impl &info, int32_t chunk_size = 0, int32_t max_capacity = 512000); + const stream_info_impl &info, int32_t chunk_size = 0, int32_t max_capacity = 512000, + uint32_t flags = transp_default); /** * Destructor. @@ -317,6 +320,8 @@ class stream_outlet_impl { stream_info_impl_p info_; /// the single-producer, multiple-receiver send buffer send_buffer_p send_buffer_; + /// Flag to indicate that push_* operations should be blocking synchronous. false by default. + bool do_sync_; /// the IO service objects (two per stack: one for UDP and one for TCP) std::vector ios_; @@ -329,6 +334,8 @@ class stream_outlet_impl { std::vector responders_; /// threads that handle the I/O operations (two per stack: one for UDP and one for TCP) std::vector io_threads_; + /// buffers used in synchronous call to gather-write data directly to the socket. + std::vector sync_buffs_; }; } // namespace lsl diff --git a/src/tcp_server.cpp b/src/tcp_server.cpp index 6dd7b2ef..21404b7f 100644 --- a/src/tcp_server.cpp +++ b/src/tcp_server.cpp @@ -148,10 +148,10 @@ class client_session : public std::enable_shared_from_this { }; tcp_server::tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p sendbuf, - factory_p factory, tcp protocol, int chunk_size) + factory_p factory, tcp protocol, int chunk_size, bool do_sync) : chunk_size_(chunk_size), shutdown_(false), info_(std::move(info)), io_(std::move(io)), factory_(std::move(factory)), send_buffer_(std::move(sendbuf)), - acceptor_(std::make_shared(*io_)) { + acceptor_(std::make_shared(*io_)), transfer_is_sync_(do_sync) { // open the server connection acceptor_->open(protocol); @@ -222,36 +222,68 @@ void tcp_server::handle_accept_outcome(std::shared_ptr newsessio accept_next_connection(); } +// === synchronous transfer + +void tcp_server::write_all_blocking(std::vector buffs) { + std::lock_guard lock(inflight_mut_); + std::size_t bytes_sent; + asio::error_code ec; + for (const auto &x : inflight_ready_) { + if (x.second && x.first->is_open()) { + bytes_sent = x.first->send(buffs, 0, ec); + if (ec) { + switch(ec.value()) { + case asio::error::broken_pipe: + case asio::error::connection_reset: + LOG_F(WARNING, "Broken Pipe / Connection Reset detected. Closing socket."); + inflight_ready_[x.first] = false; + post(*io_, [x]() { + close_inflight_socket(x); + }); + // We leave it up to the client_session destructor to remove the socket. + break; + default: + LOG_F(WARNING, "Unhandled write_all_blocking error: %s.", ec.message().c_str()); + } + } + } + } +} + // === graceful cancellation of in-flight sockets === void tcp_server::register_inflight_socket(const tcp_socket_p &sock) { std::lock_guard lock(inflight_mut_); - inflight_.insert(sock); + inflight_ready_.insert({sock, false}); } void tcp_server::unregister_inflight_socket(const tcp_socket_p &sock) { std::lock_guard lock(inflight_mut_); - inflight_.erase(sock); + inflight_ready_[sock] = false; + inflight_ready_.erase(sock); } -void tcp_server::close_inflight_sockets() { - std::lock_guard lock(inflight_mut_); - for (const auto &sock : inflight_) - post(*io_, [sock]() { +void tcp_server::close_inflight_socket(std::pair x) { + try { + if (x.first->is_open()) { try { - if (sock->is_open()) { - try { - // (in some cases shutdown may fail) - sock->shutdown(sock->shutdown_both); - } catch (...) {} - sock->close(); - } - } catch (std::exception &e) { - LOG_F(WARNING, "Error during shutdown_and_close: %s", e.what()); - } - }); + // (in some cases shutdown may fail) + x.first->shutdown(x.first->shutdown_both); + } catch (...) {} + x.first->close(); + } + } catch (std::exception &e) { + LOG_F(WARNING, "Error during shutdown_and_close: %s", e.what()); + } } +void tcp_server::close_inflight_sockets() { + std::lock_guard lock(inflight_mut_); + for (const auto &x : inflight_ready_) { + inflight_ready_[x.first] = false; + post(*io_, [x]() { close_inflight_socket(x); }); + } +} // === implementation of the client_session class === @@ -511,7 +543,11 @@ void client_session::handle_send_feedheader_outcome(err_t err, std::size_t n) { feedbuf_.consume(n); // register outstanding work at the server (will be unregistered at session destruction) work_ = std::make_shared(serv_->io_->get_executor()); - // spawn a sample transfer thread + serv_->inflight_ready_[sock_] = true; + if (serv_->transfer_is_sync_) + LOG_F(WARNING, "Using synchronous blocking transfers for new client session."); + // spawn a sample transfer thread. + // TODO: only spawn thread in async, but then we need `this` to belong to something else. std::thread(&client_session::transfer_samples_thread, this, shared_from_this()).detach(); } catch (std::exception &e) { LOG_F(WARNING, "Unexpected error while handling the feedheader send outcome: %s", e.what()); diff --git a/src/tcp_server.h b/src/tcp_server.h index cbc27a49..f8216a86 100644 --- a/src/tcp_server.h +++ b/src/tcp_server.h @@ -6,7 +6,7 @@ #include #include #include -#include +#include #include using asio::ip::tcp; @@ -47,9 +47,11 @@ class tcp_server : public std::enable_shared_from_this { * @param protocol The protocol (IPv4 or IPv6) that shall be serviced by this server. * @param chunk_size The preferred chunk size, in samples. If 0, the pushthrough flag determines * the effective chunking. + * @param do_sync Set true to indicate data transfer should happen synchronously in a blocking + * call. Default false -- asynchronous transfer in a thread (copies data). */ tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p sendbuf, factory_p factory, - tcp protocol, int chunk_size); + tcp protocol, int chunk_size, bool do_sync = false); /** * Begin serving TCP connections. @@ -67,6 +69,12 @@ class tcp_server : public std::enable_shared_from_this { */ void end_serving(); + /** + * Write directly to each socket. This should only be used when server initialized with + * do_async = false. + */ + void write_all_blocking(std::vector buffs); + private: friend class client_session; /// Start accepting a new connection. @@ -82,6 +90,9 @@ class tcp_server : public std::enable_shared_from_this { /// Unregister an in-flight session socket. void unregister_inflight_socket(const tcp_socket_p &sock); + /// Post a close of a single in-flight socket + static void close_inflight_socket(std::pair x); + /// Post a close of all in-flight sockets. void close_inflight_sockets(); @@ -100,9 +111,12 @@ class tcp_server : public std::enable_shared_from_this { // acceptor socket tcp_acceptor_p acceptor_; // our server socket + // Flag to indicate that new client_sessions should use synchronous blocking data transfer. + bool transfer_is_sync_; + // registry of in-flight client sockets (for cancellation) - std::set inflight_; // registry of currently in-flight sockets - std::recursive_mutex inflight_mut_; // mutex protecting the registry from concurrent access + std::map inflight_ready_; // registry of currently in-flight sockets + std::recursive_mutex inflight_mut_; // mutex protecting the registry from concurrent access // some cached data std::string shortinfo_msg_; // pre-computed short-info server response From 6b14a1f22dc60a6a545ea9016753c238605d1542 Mon Sep 17 00:00:00 2001 From: Chadwick Boulay Date: Sat, 9 Oct 2021 11:07:58 -0400 Subject: [PATCH 2/5] gather-write directly to asio when using sync mode and pushing a sample of buffers. --- examples/CMakeLists.txt | 1 + examples/ReceiveDataInChunks.cpp | 10 +- examples/SendData.cpp | 164 ++++++++++++++++++------------- examples/SendDataInChunks.cpp | 26 +++-- include/lsl/outlet.h | 7 +- include/lsl_cpp.h | 16 ++- src/lsl_outlet_c.cpp | 30 ++++-- src/stream_outlet_impl.cpp | 57 ++++++----- src/stream_outlet_impl.h | 24 ++++- src/tcp_server.cpp | 13 ++- 10 files changed, 226 insertions(+), 122 deletions(-) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 433add67..eca4d200 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -36,6 +36,7 @@ addlslexample(ReceiveDataInChunks cpp) addlslexample(ReceiveDataSimple cpp) addlslexample(ReceiveStringMarkers cpp) addlslexample(ReceiveStringMarkersC c) +addlslexample(SendData cpp) addlslexample(SendDataC c) addlslexample(SendDataInChunks cpp) addlslexample(SendDataSimple cpp) diff --git a/examples/ReceiveDataInChunks.cpp b/examples/ReceiveDataInChunks.cpp index 5b5c4d5f..fb1ed5cf 100644 --- a/examples/ReceiveDataInChunks.cpp +++ b/examples/ReceiveDataInChunks.cpp @@ -8,18 +8,18 @@ int main(int argc, char **argv) { std::cout << "ReceiveDataInChunks" << std::endl; std::cout << "ReceiveDataInChunks StreamName max_buflen flush" << std::endl; - std::cout << "- max_buffered -- duration in msec to buffer" << std::endl; + std::cout << "- max_buffered -- duration in sec (or x100 samples if samplerate is 0) to buffer in the receiver" << std::endl; std::cout << "- flush -- set non-zero to flush data instead of pulling; useful for testing throughput" << std::endl; try { std::string name{argc > 1 ? argv[1] : "MyAudioStream"}; - double max_buflen = argc > 2 ? std::stod(argv[2]) : 360.; + double max_buffered = argc > 2 ? std::stod(argv[2]) : 360.; bool flush = argc > 3; // resolve the stream of interest & make an inlet - int32_t buf_samples = (int32_t)(max_buflen * 1000); - lsl::stream_inlet inlet(lsl::resolve_stream("name", name).at(0), max_buflen, - transp_bufsize_thousandths); + lsl::stream_info inlet_info = lsl::resolve_stream("name", name).at(0); + lsl::stream_inlet inlet(inlet_info,(int32_t)(max_buffered * 1000), + 0, true, transp_bufsize_thousandths); // Use set_postprocessing to get the timestamps in a common base clock. // Do not use if this application will record timestamps to disk -- it is better to diff --git a/examples/SendData.cpp b/examples/SendData.cpp index 744fd981..757eb957 100644 --- a/examples/SendData.cpp +++ b/examples/SendData.cpp @@ -1,69 +1,95 @@ -#include "lsl_cpp.h" -#include -#include -#include -using namespace std; - -/** - * This example program offers an 8-channel stream, float-formatted, that resembles EEG data. - * The example demonstrates also how per-channel meta-data can be specified using the .desc() field - * of the stream information object. - * - * Note that the timer used in the send loop of this program is not particularly accurate. - */ - - -const char *channels[] = {"C3", "C4", "Cz", "FPz", "POz", "CPz", "O1", "O2"}; - -int main(int argc, char *argv[]) { - string name, type; - if (argc != 3) { - cout << "This opens a stream under some user-defined name and with a user-defined content " - "type." - << endl; - cout << "Please enter the stream name and the stream type (e.g. \"BioSemi EEG\" (without " - "the quotes)):" - << endl; - cin >> name >> type; - } else { - name = argv[1]; - type = argv[2]; - } - - try { - - // make a new stream_info (100 Hz) - lsl::stream_info info(name, type, 8, 100, lsl::cf_float32, string(name) += type); - - // add some description fields - info.desc().append_child_value("manufacturer", "BioSemi"); - lsl::xml_element chns = info.desc().append_child("channels"); - for (int k = 0; k < 8; k++) - chns.append_child("channel") - .append_child_value("label", channels[k]) - .append_child_value("unit", "microvolts") - .append_child_value("type", "EEG"); - - // make a new outlet - lsl::stream_outlet outlet(info); - - // send data forever - cout << "Now sending data... " << endl; - double starttime = ((double)clock()) / CLOCKS_PER_SEC; - for (unsigned t = 0;; t++) { - - // wait a bit and create random data - while (((double)clock()) / CLOCKS_PER_SEC < starttime + t * 0.01) - ; - float sample[8]; - for (int c = 0; c < 8; c++) sample[c] = (float)((rand() % 1500) / 500.0 - 1.5); - - // send the sample - outlet.push_sample(sample); - } - - } catch (std::exception &e) { cerr << "Got an exception: " << e.what() << endl; } - cout << "Press any key to exit. " << endl; - cin.get(); - return 0; -} +#include "lsl_cpp.h" +#include +#include +#include +#include +using namespace std; + +/** + * This example program offers an 8-channel stream, float-formatted, that resembles EEG data. + * The example demonstrates also how per-channel meta-data can be specified using the .desc() field + * of the stream information object. + * + * Note that the timer used in the send loop of this program is not particularly accurate. + */ + + +const char *channels[] = {"C3", "C4", "Cz", "FPz", "POz", "CPz", "O1", "O2"}; + +int main(int argc, char *argv[]) { + string name, type; + if (argc < 3) { + cout << "This opens a stream under some user-defined name and with a user-defined content " + "type." << endl; + cout << "SendData Name Type n_channels[8] srate[100] max_buffered[360] sync[false] contig[true]" << endl; + cout << "Please enter the stream name and the stream type (e.g. \"BioSemi EEG\" (without " + "the quotes)):" + << endl; + cin >> name >> type; + } else { + name = argv[1]; + type = argv[2]; + } + int n_channels = argc > 3 ? std::stol(argv[3]) : 8; + n_channels = n_channels < 8 ? 8 : n_channels; + int samplingrate = argc > 4 ? std::stol(argv[4]) : 100; + int max_buffered = argc > 5 ? std::stol(argv[5]) : 360; + bool sync = argc > 6 ? std::stol(argv[6]) > 0 : false; + bool contig = argc > 7 ? std::stol(argv[7]) > 0 : true; + + try { + + // make a new stream_info (100 Hz) + lsl::stream_info info(name, type, n_channels, samplingrate, lsl::cf_float32, string(name) += type); + + // add some description fields + info.desc().append_child_value("manufacturer", "LSL"); + lsl::xml_element chns = info.desc().append_child("channels"); + for (int k = 0; k < n_channels; k++) + chns.append_child("channel") + .append_child_value("label", k < 8 ? channels[k] : "Chan-" + std::to_string(k+1)) + .append_child_value("unit", "microvolts") + .append_child_value("type", type); + + // make a new outlet + lsl::stream_outlet outlet(info, 0, max_buffered, sync ? transp_sync_blocking : transp_default); + + // send data forever + cout << "Now sending data... " << endl; + double starttime = ((double)clock()) / CLOCKS_PER_SEC; + + // Initialize 2 discontiguous data arrays. + vector sample(8, 0.0); + vector extra(n_channels - 8, 0.0); + if (contig) { + // If this is contiguous mode (default) then we combine the arrays. + sample.insert( + sample.end(), + make_move_iterator(extra.begin()), + make_move_iterator(extra.end())); + } + // bytes is used in !contig mode because we need to know how big each buffer is. + array bytes = {8 * sizeof(float), static_cast((n_channels - 8) * sizeof(float))}; + for (unsigned t = 0;; t++) { + + // wait a bit and create random data + while (((double)clock()) / CLOCKS_PER_SEC < starttime + t * 0.01) + ; + for (int c = 0; c < 8; c++) sample[c] = (float)((rand() % 1500) / 500.0 - 1.5); + + // send the sample + if (contig) + outlet.push_sample(sample); + else { + // Advanced: Push set of discontiguous buffers. + array bufs = {sample.data(), extra.data()}; + outlet.push_numeric_bufs(reinterpret_cast(const_cast(bufs.data())), + bytes.data(), 2, lsl::local_clock(), true); + } + } + + } catch (exception &e) { cerr << "Got an exception: " << e.what() << endl; } + cout << "Press any key to exit. " << endl; + cin.get(); + return 0; +} diff --git a/examples/SendDataInChunks.cpp b/examples/SendDataInChunks.cpp index 846fe676..0263938c 100644 --- a/examples/SendDataInChunks.cpp +++ b/examples/SendDataInChunks.cpp @@ -104,6 +104,7 @@ int main(int argc, char **argv) { int32_t chunk_rate = argc > 6 ? std::stol(argv[6]) : 10; // Chunks per second. bool nodata = argc > 7; bool do_sync = argc > 8 ? (bool)std::stol(argv[8]) : true; + bool b_contig = true && do_sync; // Set true to test gather-write operations. int32_t chunk_samples = samplingrate > 0 ? std::max((samplingrate / chunk_rate), 1) : 100; // Samples per chunk. int32_t chunk_duration = 1000 / chunk_rate; // Milliseconds per chunk @@ -118,7 +119,7 @@ int main(int argc, char **argv) { lsl::xml_element chn = chns.append_child("channel"); chn.append_child_value("label", "Chan-" + std::to_string(c)); chn.append_child_value("unit", "microvolts"); - chn.append_child_value("type", "EEG"); + chn.append_child_value("type", type); } int32_t buf_samples = max_buffered * samplingrate; lsl::stream_outlet outlet(info, chunk_samples, buf_samples, @@ -127,32 +128,41 @@ int main(int argc, char **argv) { std::cout << "Stream UID: " << info.uid() << std::endl; // Create a connection to our device. - fake_device my_device(n_channels, (float)samplingrate); + int dev_chans = b_contig ? n_channels : n_channels + 1; + fake_device my_device(dev_chans, (float)samplingrate); // Prepare buffer to get data from 'device'. // The buffer should be larger than you think you need. Here we make it 4x as large. - std::vector chunk_buffer(4 * chunk_samples * n_channels); - std::fill(chunk_buffer.begin(), chunk_buffer.end(), 0); + std::vector dev_buffer(4 * chunk_samples * dev_chans); + std::fill(dev_buffer.begin(), dev_buffer.end(), 0); std::cout << "Now sending data..." << std::endl; // Your device might have its own timer. Or you can decide how often to poll // your device, as we do here. - auto next_chunk_time = std::chrono::high_resolution_clock::now(); + auto t_start = std::chrono::high_resolution_clock::now(); + auto next_chunk_time = t_start; for (unsigned c = 0;; c++) { // wait a bit next_chunk_time += std::chrono::milliseconds(chunk_duration); std::this_thread::sleep_until(next_chunk_time); // Get data from device - std::size_t returned_samples = my_device.get_data(chunk_buffer, nodata); + std::size_t returned_samples = my_device.get_data(dev_buffer, nodata); // send it to the outlet. push_chunk_multiplexed is one of the more complicated approaches. // other push_chunk methods are easier but slightly slower. double ts = lsl::local_clock(); - outlet.push_chunk_multiplexed(chunk_buffer.data(), returned_samples * n_channels, ts, true); + if (b_contig) { + // Push a chunk of a contiguous buffer. + outlet.push_chunk_multiplexed(dev_buffer.data(), returned_samples * n_channels, ts, true); + } else { + std::cout << "Discontiguous push_chunk not yet supported." << std::endl; + std::cout << "See SendData.cpp for discontiguous push_sample, then set " << std::endl; + std::cout << "timestamps as LSL_DEDUCED_TIMESTAMP and pushtrough as false " << std::endl; + std::cout << "for all samples except the the first or last in a chunk." << std::endl; + } } - } catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; } std::cout << "Press any key to exit. " << std::endl; std::cin.get(); diff --git a/include/lsl/outlet.h b/include/lsl/outlet.h index eba0a90b..c03fc72f 100644 --- a/include/lsl/outlet.h +++ b/include/lsl/outlet.h @@ -99,7 +99,7 @@ extern LIBLSL_C_API int32_t lsl_push_sample_vtp(lsl_outlet out, const void *data * @see lsl_push_sample_ftp * @param out The lsl_outlet object through which to push the data. * @param data A pointer to values to push. The number of values pointed to must be no less than the number of channels in the sample. - * @param lengths A pointer the number of elements to push for each channel (string lengths). + * @param lengths A pointer the number of elements to push for each channel (string lengths, or number of bytes). */ extern LIBLSL_C_API int32_t lsl_push_sample_buf(lsl_outlet out, const char **data, const uint32_t *lengths); /** @copydoc lsl_push_sample_buf @@ -108,6 +108,11 @@ extern LIBLSL_C_API int32_t lsl_push_sample_buft(lsl_outlet out, const char **da /** @copydoc lsl_push_sample_buft * @param pushthrough @see lsl_push_sample_ftp */ extern LIBLSL_C_API int32_t lsl_push_sample_buftp(lsl_outlet out, const char **data, const uint32_t *lengths, double timestamp, int32_t pushthrough); +/** @copydoc lsl_push_sample_buftp + * @param nbufs Number of values pointed to in `data` and number of items in `lengths` -- doesn't assume one buffer + * per channel but each array in data must be longer than each item in lengths. + */ +extern LIBLSL_C_API int32_t lsl_push_sample_buftpn(lsl_outlet out, const char **data, const uint32_t *lengths, double timestamp, int32_t pushthrough, uint32_t nbufs); /** Push a chunk of multiplexed samples into the outlet. One timestamp per sample is provided. * diff --git a/include/lsl_cpp.h b/include/lsl_cpp.h index a0711639..faf583cf 100644 --- a/include/lsl_cpp.h +++ b/include/lsl_cpp.h @@ -499,7 +499,7 @@ class stream_outlet { } /** Push a pointer to raw numeric data as one sample into the outlet. - * This is the lowest-level function; performns no checking whatsoever. Can not be used for + * This is the lowest-level function; performs no checking whatsoever. Cannot be used for * variable-size / string-formatted channels. * @param sample A pointer to the raw sample data to push. * @param timestamp Optionally the capture time of the sample, in agreement with local_clock(); @@ -512,6 +512,20 @@ class stream_outlet { lsl_push_sample_vtp(obj.get(), (sample), timestamp, pushthrough); } + /** + * Push a pointer to an array of buffers of variable size as one sample into the outlet. + * + * @param bufs A pointer to an array of data buffers. + * @param bytes An array of sizes (number of bytes) of buffers in bufs. + * @param nbufs Total number of buffers. + * @param timestamp Optionally the capture time of the sample, in agreement with local_clock(); + * @param pushthrough Whether to push the sample through to the receivers immediately instead of + * concatenating with subsequent samples. + */ + void push_numeric_bufs(const char **bufs, uint32_t *bytes, uint32_t nbufs, double timestamp = 0.0, bool pushthrough = true) { + lsl_push_sample_buftpn(obj.get(), bufs, bytes, timestamp, pushthrough, nbufs); + } + // =================================================== // === Pushing an chunk of samples into the outlet === diff --git a/src/lsl_outlet_c.cpp b/src/lsl_outlet_c.cpp index b652c53b..45621470 100644 --- a/src/lsl_outlet_c.cpp +++ b/src/lsl_outlet_c.cpp @@ -28,7 +28,7 @@ LIBLSL_C_API lsl_outlet lsl_create_outlet_ex( buf_samples /= 1000; buf_samples = (buf_samples > 0) ? buf_samples : 1; return create_object_noexcept( - *info, chunk_size, buf_samples); + *info, chunk_size, buf_samples, flags); } LIBLSL_C_API lsl_outlet lsl_create_outlet( @@ -171,14 +171,32 @@ LIBLSL_C_API int32_t lsl_push_sample_buft( LIBLSL_C_API int32_t lsl_push_sample_buftp(lsl_outlet out, const char **data, const uint32_t *lengths, double timestamp, int32_t pushthrough) { + stream_outlet_impl *outimpl = out; + return lsl_push_sample_buftpn(out, data, lengths, timestamp, pushthrough, + (uint32_t)outimpl->info().channel_count()); +} + +LIBLSL_C_API int32_t lsl_push_sample_buftpn(lsl_outlet out, const char **data, + const uint32_t *lengths, double timestamp, int32_t pushthrough, uint32_t nbufs) { + stream_outlet_impl *outimpl = out; try { - stream_outlet_impl *outimpl = out; - std::vector tmp; - for (uint32_t k = 0; k < (uint32_t)outimpl->info().channel_count(); k++) - tmp.emplace_back(data[k], lengths[k]); - return outimpl->push_sample_noexcept(&tmp[0], timestamp, pushthrough); + if (outimpl->is_sync_blocking()) { + // Convert input to a vector of asio buffers + std::vector buffs; + for (auto buf_ix = 0; buf_ix < nbufs; buf_ix++) { + buffs.push_back(asio::buffer(data[buf_ix], lengths[buf_ix])); + } + return outimpl->push_sample_gather(buffs, timestamp, pushthrough); + } else { + std::vector tmp; + for (uint32_t k = 0; k < nbufs; k++) + tmp.emplace_back(data[k], lengths[k]); + return outimpl->push_sample_noexcept(&tmp[0], timestamp, pushthrough); + } } catch (std::exception &e) { LOG_F(WARNING, "Unexpected error during push_sample: %s", e.what()); + if (!outimpl->is_sync_blocking() && outimpl->info().channel_format() != cft_string) + LOG_F(ERROR, "lsl_push_sample_buftpn only compatible with string type or when outlet is using sync writes."); return lsl_internal_error; } } diff --git a/src/stream_outlet_impl.cpp b/src/stream_outlet_impl.cpp index b1986ae9..149898c2 100644 --- a/src/stream_outlet_impl.cpp +++ b/src/stream_outlet_impl.cpp @@ -23,7 +23,7 @@ stream_outlet_impl::stream_outlet_impl( send_buffer_(std::make_shared(max_capacity)), do_sync_(flags & transp_sync_blocking) { - if ((info.channel_format() == cft_string) && (flags & transp_sync_blocking)) { + if ((info.channel_format() == cft_string) && do_sync_) { LOG_F(WARNING, "sync push not supported for string-formatted streams. Reverting to async."); do_sync_ = false; } @@ -154,20 +154,8 @@ void stream_outlet_impl::push_numeric_raw(const void *data, double timestamp, bo smp->assign_untyped(data); // Note: Makes a copy! send_buffer_->push_sample(smp); } else { - if (timestamp == DEDUCED_TIMESTAMP) { - sync_buffs_.push_back(asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1)); - } else { - sync_buffs_.push_back(asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1)); - sync_buffs_.push_back(asio::buffer(×tamp, sizeof(timestamp))); - } - sync_buffs_.push_back(asio::buffer(data, smp->datasize())); - if (pushthrough) { - for (auto &tcp_server : tcp_servers_) - tcp_server->write_all_blocking(sync_buffs_); - sync_buffs_.clear(); - } + enqueue_sync(asio::buffer(data, smp->datasize()), timestamp, pushthrough); } - } bool stream_outlet_impl::have_consumers() { return send_buffer_->have_consumers(); } @@ -176,6 +164,28 @@ bool stream_outlet_impl::wait_for_consumers(double timeout) { return send_buffer_->wait_for_consumers(timeout); } +void stream_outlet_impl::push_timestamp_sync(double timestamp) { + if (timestamp == DEDUCED_TIMESTAMP) { + sync_buffs_.emplace_back(asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1)); + } else { + sync_buffs_.emplace_back(asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1)); + sync_buffs_.emplace_back(asio::buffer(×tamp, sizeof(timestamp))); + } +} + +void stream_outlet_impl::pushthrough_sync() { + // LOG_F(INFO, "Pushing %u buffers.", sync_buffs_.size()); + for (auto &tcp_server : tcp_servers_) + tcp_server->write_all_blocking(sync_buffs_); + sync_buffs_.clear(); +} + +void stream_outlet_impl::enqueue_sync(asio::const_buffer buff, double timestamp, bool pushthrough) { + push_timestamp_sync(timestamp); + sync_buffs_.push_back(buff); + if (pushthrough) pushthrough_sync(); +} + template void stream_outlet_impl::enqueue(const T *data, double timestamp, bool pushthrough) { if (lsl::api_config::get_instance()->force_default_timestamps()) timestamp = 0.0; @@ -185,18 +195,7 @@ void stream_outlet_impl::enqueue(const T *data, double timestamp, bool pushthrou smp->assign_typed(data); send_buffer_->push_sample(smp); } else { - if (timestamp == DEDUCED_TIMESTAMP) { - sync_buffs_.push_back(asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1)); - } else { - sync_buffs_.push_back(asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1)); - sync_buffs_.push_back(asio::buffer(×tamp, sizeof(timestamp))); - } - sync_buffs_.push_back(asio::buffer(data, smp->datasize())); - if (pushthrough) { - for (auto &tcp_server : tcp_servers_) - tcp_server->write_all_blocking(sync_buffs_); - sync_buffs_.clear(); - } + enqueue_sync(asio::buffer(data, smp->datasize()), timestamp, pushthrough); } } @@ -208,4 +207,10 @@ template void stream_outlet_impl::enqueue(const float *data, double, bool template void stream_outlet_impl::enqueue(const double *data, double, bool); template void stream_outlet_impl::enqueue(const std::string *data, double, bool); +void stream_outlet_impl::enqueue_sync_multi(std::vector buffs, double timestamp, bool pushthrough) { + push_timestamp_sync(timestamp); + sync_buffs_.insert( sync_buffs_.end(), buffs.begin(), buffs.end() ); + if (pushthrough) pushthrough_sync(); +} + } // namespace lsl diff --git a/src/stream_outlet_impl.h b/src/stream_outlet_impl.h index 32b76b8a..c92a3443 100644 --- a/src/stream_outlet_impl.h +++ b/src/stream_outlet_impl.h @@ -137,7 +137,10 @@ class stream_outlet_impl { void push_sample(const std::string *data, double timestamp = 0.0, bool pushthrough = true) { enqueue(data, timestamp, pushthrough); } - + lsl_error_code_t push_sample_gather(std::vector buffs, double timestamp = 0.0, bool pushthrough = true) { + enqueue_sync_multi(buffs, timestamp, pushthrough); + return lsl_no_error; + } template inline lsl_error_code_t push_sample_noexcept( @@ -295,6 +298,9 @@ class stream_outlet_impl { /// Wait until some consumer shows up. bool wait_for_consumers(double timeout = FOREVER); + /// If the outlet is intended to use synchronous blocking transfers + bool is_sync_blocking() { return do_sync_; }; + private: /// Instantiate a new server stack. void instantiate_stack(tcp tcp_protocol, udp udp_protocol); @@ -302,6 +308,22 @@ class stream_outlet_impl { /// Allocate and enqueue a new sample into the send buffer. template void enqueue(const T *data, double timestamp, bool pushthrough); + /// Append the appropriate timestamp tag and optionally timestamp onto sync_buffs_ for a single timestamp. + void push_timestamp_sync(double timestamp); + + /// push sync_buffs_ through each tcp server. + void pushthrough_sync(); + + /// Append a single timestamp and single buffer to sync_buffs and optionally pushthrough the server. + void enqueue_sync(asio::const_buffer buff, double timestamp, bool pushthrough); + + /** + * Append a single timestamp and multiple within-sample buffers to sync_buffs_. + * This is useful when a sample is discontiguous in memory. It makes no assumptions about how + * many channels are included in each buffer. + */ + void enqueue_sync_multi(std::vector buffs, double timestamp, bool pushthrough); + /** * Check whether some given number of channels matches the stream's channel_count. * Throws an error if not. diff --git a/src/tcp_server.cpp b/src/tcp_server.cpp index 21404b7f..83b7b326 100644 --- a/src/tcp_server.cpp +++ b/src/tcp_server.cpp @@ -226,12 +226,15 @@ void tcp_server::handle_accept_outcome(std::shared_ptr newsessio void tcp_server::write_all_blocking(std::vector buffs) { std::lock_guard lock(inflight_mut_); - std::size_t bytes_sent; - asio::error_code ec; + std::size_t bytes_sent = 0; for (const auto &x : inflight_ready_) { if (x.second && x.first->is_open()) { - bytes_sent = x.first->send(buffs, 0, ec); - if (ec) { + try { + // I couldn't figure out how to get the correct overload while providing + // error_code& ec to the write function. So we use try-catch instead. + bytes_sent = asio::write(*x.first, buffs); + } catch (const asio::system_error &err) { // std::exception &e + asio::error_code ec = err.code(); switch(ec.value()) { case asio::error::broken_pipe: case asio::error::connection_reset: @@ -243,7 +246,7 @@ void tcp_server::write_all_blocking(std::vector buffs) { // We leave it up to the client_session destructor to remove the socket. break; default: - LOG_F(WARNING, "Unhandled write_all_blocking error: %s.", ec.message().c_str()); + LOG_F(WARNING, "Unhandled write_all_blocking error: %s.", err.what()); } } } From 22079762a76b57aec165bedcba00dbbcfa34ca82 Mon Sep 17 00:00:00 2001 From: Chadwick Boulay Date: Wed, 3 Nov 2021 10:19:03 -0400 Subject: [PATCH 3/5] Add a bit more debug-ability to Receive examples. --- examples/ReceiveDataC.c | 15 ++++++++++++--- examples/ReceiveDataSimple.cpp | 6 +++++- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/examples/ReceiveDataC.c b/examples/ReceiveDataC.c index d1dc84f8..cc5ef39e 100644 --- a/examples/ReceiveDataC.c +++ b/examples/ReceiveDataC.c @@ -5,6 +5,8 @@ * Example program that demonstrates how to resolve a specific stream on the lab network and how to * connect to it in order to receive data. */ +const int NCHANS = 8; + int main(int argc, char *argv[]) { @@ -12,7 +14,7 @@ int main(int argc, char *argv[]) { lsl_streaminfo info; /* the streaminfo returned by the resolve call */ lsl_inlet inlet; /* a stream inlet to get samples from */ int errcode; /* error code (lsl_lost_error or timeouts) */ - float cursample[8]; /* array to hold our current sample */ + float cursample[NCHANS]; /* array to hold our current sample */ double timestamp; /* time stamp of the current sample (in sender time) */ /* resolve the stream of interest (result array: info, array capacity: 1 element, type shall be @@ -20,6 +22,11 @@ int main(int argc, char *argv[]) { printf("Now waiting for an EEG stream...\n"); lsl_resolve_byprop(&info, 1, "type", "EEG", 1, LSL_FOREVER); + /* These next two variables aren't used for anything in this example. + * They simply demonstrate how to use streaminfo getters. */ + lsl_channel_format_t fmt = lsl_get_channel_format(info); + double srate = lsl_get_nominal_srate(info); + /* make an inlet to read data from the stream (buffer max. 300 seconds of data, no preference * regarding chunking, automatic recovery enabled) */ inlet = lsl_create_inlet(info, 300, LSL_NO_PREFERENCE, 1); @@ -33,10 +40,12 @@ int main(int argc, char *argv[]) { for (t = 0; t < 100000000; t++) { /* get the next sample form the inlet (read into cursample, 8 values, wait forever if * necessary) and return the timestamp if we got something */ - timestamp = lsl_pull_sample_f(inlet, cursample, 8, LSL_FOREVER, &errcode); + timestamp = lsl_pull_sample_f(inlet, cursample, NCHANS, LSL_FOREVER, &errcode); /* print the data */ - for (k = 0; k < 8; ++k) printf("\t%.2f", cursample[k]); + printf("%.2f", timestamp); + for (k = 0; k < 8; ++k) + printf("\t%.2f", cursample[k]); printf("\n"); } diff --git a/examples/ReceiveDataSimple.cpp b/examples/ReceiveDataSimple.cpp index 47c21f4d..73db3220 100644 --- a/examples/ReceiveDataSimple.cpp +++ b/examples/ReceiveDataSimple.cpp @@ -1,5 +1,6 @@ #include #include +#include /** * This is a minimal example that demonstrates how a multi-channel stream (here 128ch) of a @@ -16,7 +17,10 @@ int main(int argc, char **argv) { // receive data & time stamps forever (not displaying them here) std::vector sample; - while (true) inlet.pull_sample(sample); + while (true) { + double timestamp = inlet.pull_sample(sample); + std::cout << timestamp << "\t" << sample[0] << "\t" << sample[1] << "..." << std::endl; + } return 0; } From 13787d371c6d7a8f86727e1d92c459c952dffc2c Mon Sep 17 00:00:00 2001 From: Chadwick Boulay Date: Wed, 3 Nov 2021 10:33:04 -0400 Subject: [PATCH 4/5] raw buffs use void**; incorporate other suggestions from tstenner. Currently not working properly! --- examples/SendData.cpp | 78 ++++++----- examples/SendDataCBlocking.c | 252 +++++++++++++++++++++++++++++++++++ include/lsl/outlet.h | 14 +- include/lsl_cpp.h | 4 +- src/lsl_outlet_c.cpp | 55 ++++++-- src/stream_outlet_impl.cpp | 12 +- src/stream_outlet_impl.h | 4 +- src/tcp_server.cpp | 67 +++++++--- src/tcp_server.h | 11 +- 9 files changed, 419 insertions(+), 78 deletions(-) create mode 100644 examples/SendDataCBlocking.c diff --git a/examples/SendData.cpp b/examples/SendData.cpp index 757eb957..8c2585df 100644 --- a/examples/SendData.cpp +++ b/examples/SendData.cpp @@ -3,7 +3,7 @@ #include #include #include -using namespace std; +#include /** * This example program offers an 8-channel stream, float-formatted, that resembles EEG data. @@ -17,15 +17,15 @@ using namespace std; const char *channels[] = {"C3", "C4", "Cz", "FPz", "POz", "CPz", "O1", "O2"}; int main(int argc, char *argv[]) { - string name, type; + std::string name, type; if (argc < 3) { - cout << "This opens a stream under some user-defined name and with a user-defined content " - "type." << endl; - cout << "SendData Name Type n_channels[8] srate[100] max_buffered[360] sync[false] contig[true]" << endl; - cout << "Please enter the stream name and the stream type (e.g. \"BioSemi EEG\" (without " + std::cout << "This opens a stream under some user-defined name and with a user-defined content " + "type." << std::endl; + std::cout << "SendData Name Type n_channels[8] srate[100] max_buffered[360] sync[false] contig[true]" << std::endl; + std::cout << "Please enter the stream name and the stream type (e.g. \"BioSemi EEG\" (without " "the quotes)):" - << endl; - cin >> name >> type; + << std::endl; + std::cin >> name >> type; } else { name = argv[1]; type = argv[2]; @@ -38,9 +38,12 @@ int main(int argc, char *argv[]) { bool contig = argc > 7 ? std::stol(argv[7]) > 0 : true; try { +// if (!sync && !contig) { +// throw std::invalid_argument( "async is incompatible with discontig push_numeric_bufs (except for strings, not used here)." ); +// } // make a new stream_info (100 Hz) - lsl::stream_info info(name, type, n_channels, samplingrate, lsl::cf_float32, string(name) += type); + lsl::stream_info info(name, type, n_channels, samplingrate, lsl::cf_float32, std::string(name) += type); // add some description fields info.desc().append_child_value("manufacturer", "LSL"); @@ -54,42 +57,53 @@ int main(int argc, char *argv[]) { // make a new outlet lsl::stream_outlet outlet(info, 0, max_buffered, sync ? transp_sync_blocking : transp_default); - // send data forever - cout << "Now sending data... " << endl; - double starttime = ((double)clock()) / CLOCKS_PER_SEC; - // Initialize 2 discontiguous data arrays. - vector sample(8, 0.0); - vector extra(n_channels - 8, 0.0); - if (contig) { - // If this is contiguous mode (default) then we combine the arrays. - sample.insert( - sample.end(), - make_move_iterator(extra.begin()), - make_move_iterator(extra.end())); - } + std::vector sample(8, 0.0); + std::vector extra(n_channels - 8, 0.0); + // If this is contiguous mode (default) then we combine the arrays. + if (contig) + sample.insert(sample.end(), extra.begin(), extra.end()); + // bytes is used in !contig mode because we need to know how big each buffer is. - array bytes = {8 * sizeof(float), static_cast((n_channels - 8) * sizeof(float))}; + std::array bytes = {8 * sizeof(float), static_cast((n_channels - 8) * sizeof(float))}; + + // Your device might have its own timer. Or you can decide how often to poll + // your device, as we do here. + int32_t sample_dur_us = 1000000 / (samplingrate > 0 ? samplingrate : 100); + auto t_start = std::chrono::high_resolution_clock::now(); + auto next_sample_time = t_start; + + // send data forever + std::cout << "Now sending data... " << std::endl; for (unsigned t = 0;; t++) { - // wait a bit and create random data - while (((double)clock()) / CLOCKS_PER_SEC < starttime + t * 0.01) - ; + // Create random data for the first 8 channels. for (int c = 0; c < 8; c++) sample[c] = (float)((rand() % 1500) / 500.0 - 1.5); + // For the remaining channels, fill them with a sample counter (wraps at 1M). + if (contig) + std::fill(sample.begin()+8, sample.end(), t % 1000000); + else + std::fill(extra.begin(), extra.end(), t % 1000000); + + // Wait until the next expected sample time. + next_sample_time += std::chrono::microseconds(sample_dur_us); + std::this_thread::sleep_until(next_sample_time); // send the sample - if (contig) + if (contig) { + std::cout << sample[0] << "\t" << sample[8] << std::endl; outlet.push_sample(sample); + } else { // Advanced: Push set of discontiguous buffers. - array bufs = {sample.data(), extra.data()}; - outlet.push_numeric_bufs(reinterpret_cast(const_cast(bufs.data())), + std::array bufs = {sample.data(), extra.data()}; + outlet.push_numeric_bufs((void **)bufs.data(), bytes.data(), 2, lsl::local_clock(), true); } } - } catch (exception &e) { cerr << "Got an exception: " << e.what() << endl; } - cout << "Press any key to exit. " << endl; - cin.get(); + } catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; } + std::cout << "Press any key to exit. " << std::endl; + std::cin.get(); return 0; } diff --git a/examples/SendDataCBlocking.c b/examples/SendDataCBlocking.c new file mode 100644 index 00000000..afdb29ae --- /dev/null +++ b/examples/SendDataCBlocking.c @@ -0,0 +1,252 @@ +#include +#include +#include +#include +#include +#include +#include + +/** + * This example program pushes a 16-bit stream with a constant value across all channels. The + * channel number and rate are configurable via command line arguments. The data are not copied! + * This requires an inconvenient application design using threads and synchronization, but the + * this kind of design is necessary when frequent copying of large data chunks is too expensive. + * The main thread takes data from the device and puts it into a transmit buffer, and the secondary + * thread pushes the data from the transmit buffer into the lsl outlet then releases the data from + * the buffer. This is a relatively new and unusual design pattern for lsl. It uses a new BLOCKING + * function `push_chunk_wait`. Presently this only supports int16 types. This function passes + * the data pointer directly to asio to write data to each consumer and waits for asio to return. + * Note: One would usually prefer to use C++ niceities and queueing libraries (e.g., + * moodycamel::concurrentqueue) when working with big data across threads. However, the high- + * throughput data demands might be come from embedded devices with compiler limitations, + * so we use C here. Any input on how to speed up this C code is greatly appreciated. + */ + +#define DEVICE_BUFFER_SIZE 4194304 + +typedef struct { + char name[100]; + char serial[100]; + double last_timestamp; + int srate; + int nchans; + int write_idx; + int read_idx; + int min_frames_per_chunk; + int16_t channel_data[DEVICE_BUFFER_SIZE / 2]; // (channels * samples) < 2 million +} fake_device; + +typedef struct { + int thread_status; + int chunk_size; + double buffer_dur; + int do_async; +} thread_params; + +// Linked-list queue +typedef struct chunk_info +{ + int16_t *buf; + int n_frames; + double timestamp; +} chunk_info; +typedef struct node +{ + chunk_info info; + struct node *next; +} node; +typedef struct queue +{ + int count; + node *front; + node *rear; +} queue; +void initialize(queue *q) +{ + q->count = 0; + q->front = NULL; + q->rear = NULL; +} +int isempty(queue *q) +{ + return (q->front == NULL); +} +void enqueue(queue *q, int16_t* data, int frames, double ts) +{ + node *tmp; + tmp = malloc(sizeof(node)); + tmp->info.buf = data; + tmp->info.n_frames = frames; + tmp->info.timestamp = ts; + tmp->next = NULL; + if(isempty(q)) + q->front = q->rear = tmp; + else + { + q->rear->next = tmp; + q->rear = tmp; + } + q->count++; +} +chunk_info dequeue(queue *q) +{ + chunk_info info = q->front->info; + node *tmp = q->front; + free(tmp); + q->front = q->front->next; + q->count--; + return(info); +} + +// Globals +fake_device *device = 0; +sem_t sem; +queue *q; + +// fetch_data -- Normally something provided by Device SDK +uint64_t fetch_data(int16_t** buffer) { + static int buf_samples = sizeof(device->channel_data) / sizeof(device->channel_data[0]); + + if (device->last_timestamp < 0) device->last_timestamp = lsl_local_clock(); + double now = lsl_local_clock(); + // Calculate how many frames/timestamps have elapsed since the last call. + uint64_t elapsed_frames = (uint64_t)((now - device->last_timestamp) * device->srate); + if (elapsed_frames < device->min_frames_per_chunk) elapsed_frames = 0; + // Cut this fetch short if it would go past the buffer. Next fetch will start at first idx. + if ((device->write_idx + elapsed_frames * device->nchans) > buf_samples) + elapsed_frames = (buf_samples - device->write_idx) / device->nchans; + // Further limit elapsed_samples to not overtake the read point (tail) +// if ((device->write_idx < device->read_idx) && +// (device->write_idx + (elapsed_frames * device->nchans) >= device->read_idx)) +// elapsed_frames = (device->read_idx - device->write_idx) / device->nchans; + if (elapsed_frames > 0) + { + // New elapsed_time after accounting for rounding to integer frames. + device->last_timestamp += (double)(elapsed_frames) / device->srate; + // I assume that the device has its own acquisition buffer and that it copies data + // to a separate data buffer for API purposes. + // We are using a model where the device SDK shares its buffer with the client application. + // This is a bit unusual but allows for fastest throughput. + *buffer = &(device->channel_data[device->write_idx]); + + // And we advance the head for the next data transfer. + device->write_idx = (device->write_idx + elapsed_frames * device->nchans) % buf_samples; + if ((buf_samples - device->write_idx) < device->nchans) + device->write_idx = 0; + } + return elapsed_frames; +} + +// transmit_thread -- responsible for popping data off the queue and pushing it to LSL +void transmit_thread(void* vargp) { + // Initialize thread-local variables + thread_params *params = (thread_params *)vargp; + + /* declare a new streaminfo */ + lsl_streaminfo info = lsl_create_streaminfo(device->name, "TestLSL", + device->nchans, device->srate, cft_int16, device->serial); + + /* add some meta-data fields to it */ + /* (for more standard fields, see https://github.com/sccn/xdf/wiki/Meta-Data) */ + lsl_xml_ptr desc = lsl_get_desc(info); + lsl_append_child_value(desc, "manufacturer", "LSL"); + lsl_xml_ptr chns = lsl_append_child(desc, "channels"); + char chanlabel[12]; + for (int c = 0; c < device->nchans; c++) { + lsl_xml_ptr chn = lsl_append_child(chns, "channel"); + snprintf(chanlabel, 20, "Chan-%d", c); + lsl_append_child_value(chn, "label", chanlabel); + lsl_append_child_value(chn, "unit", "microvolts"); + lsl_append_child_value(chn, "type", "EEG"); + } + + /* make a new outlet */ + lsl_outlet outlet = lsl_create_outlet_d(info, params->chunk_size, params->buffer_dur, params->do_async); + + printf("Now sending data...\n"); + params->thread_status = 1; + int buf_samples = sizeof(device->channel_data) / sizeof(device->channel_data[0]); + while(params->thread_status) { + sem_wait(&sem); + if (!isempty(q)) + { + chunk_info chunk = dequeue(q); + int64_t chunk_samples = chunk.n_frames * device->nchans; + lsl_push_chunk_stp(outlet, chunk.buf, chunk_samples, chunk.timestamp, 1); + device->read_idx = (device->read_idx + chunk_samples) % buf_samples; + } + } + lsl_destroy_outlet(outlet); +} + +int main(int argc, char *argv[]) { + printf("SendDataCBlocking example program. Sends int16 data with minimal copies.\n"); + printf("Usage: %s [streamname] [streamuid] [srate] [nchans] [buff_dur] [do_async]\n", argv[0]); + printf("Using lsl %d, lsl_library_info: %s\n", lsl_library_version(), lsl_library_info()); + const char *name = argc > 1 ? argv[1] : "SendDataCBlocking"; + const char *uid = argc > 2 ? argv[2] : "6s45ahas321"; + int srate = argc > 3 ? strtol(argv[3], NULL, 10) : 512; + int n_chans = argc > 4 ? strtol(argv[4], NULL, 10) : 32; + double buff_dur = argc > 5 ? strtod(argv[5], NULL) : 60.; + int do_async = argc > 6 ? strtol(argv[6], NULL, 10) : 1; + int32_t samps_per_chunk = argc > 6 ? strtol(argv[6], NULL, 10) : 30; + + // Initialize our fake device and set its parameters. This would normally be taken care of + // by the device SDK. + device = (fake_device *) malloc(sizeof(fake_device)); + memset(device, 0, sizeof(fake_device)); + strcpy(device->name, name); + device->srate = srate; + device->nchans = n_chans; + device->last_timestamp = -1.; + device->min_frames_per_chunk = device->srate / 1000; + strcpy(device->serial, uid); + // Give the device buffer data some non-zero value. + memset(device->channel_data, 23, sizeof(device->channel_data)); + // write_idx and read_idx are OK at 0. + + thread_params params; + params.buffer_dur = buff_dur; + params.chunk_size = samps_per_chunk; + params.thread_status = 0; + params.do_async = do_async; + + // Initialize q + q = malloc(sizeof(queue)); + initialize(q); + + sem_init(&sem, 0, 0); + pthread_t thread_id; + if(pthread_create(&thread_id, NULL, (void *) &transmit_thread, ¶ms)) { + fprintf(stderr, "Error creating LSL transmit thread.\n"); + return 1; + } + + int exit_condition = 0; + int16_t* shared_buff; + double last_timestamp_received = -1.; + uint64_t n_frames_received = 0; + while (1) { + if (exit_condition) break; + + // Get data from device + n_frames_received = fetch_data(&shared_buff); + if (n_frames_received > 0) + { + enqueue(q, shared_buff, n_frames_received, device->last_timestamp); + sem_post(&sem); + } + } + + if(params.thread_status) // Kill thread + { + params.thread_status=0; + if(pthread_join(thread_id, NULL)) { + fprintf(stderr, "Error terminating LSL transmit thread.\n"); + } + } + sem_destroy(&sem); + free(device); + + return 0; +} diff --git a/include/lsl/outlet.h b/include/lsl/outlet.h index c03fc72f..06923495 100644 --- a/include/lsl/outlet.h +++ b/include/lsl/outlet.h @@ -98,8 +98,10 @@ extern LIBLSL_C_API int32_t lsl_push_sample_vtp(lsl_outlet out, const void *data /** @copybrief lsl_push_sample_ftp * @see lsl_push_sample_ftp * @param out The lsl_outlet object through which to push the data. - * @param data A pointer to values to push. The number of values pointed to must be no less than the number of channels in the sample. - * @param lengths A pointer the number of elements to push for each channel (string lengths, or number of bytes). + * @param data An array of data buffers to push. The number of buffers in the array must be no less + * than the number of channels in the sample. Each entry in data must be longer than the corresponding + * entry in `lengths`. + * @param lengths An array containing the lengths of each buffer in data. Units are string lengths or number of bytes. */ extern LIBLSL_C_API int32_t lsl_push_sample_buf(lsl_outlet out, const char **data, const uint32_t *lengths); /** @copydoc lsl_push_sample_buf @@ -108,11 +110,13 @@ extern LIBLSL_C_API int32_t lsl_push_sample_buft(lsl_outlet out, const char **da /** @copydoc lsl_push_sample_buft * @param pushthrough @see lsl_push_sample_ftp */ extern LIBLSL_C_API int32_t lsl_push_sample_buftp(lsl_outlet out, const char **data, const uint32_t *lengths, double timestamp, int32_t pushthrough); + /** @copydoc lsl_push_sample_buftp - * @param nbufs Number of values pointed to in `data` and number of items in `lengths` -- doesn't assume one buffer - * per channel but each array in data must be longer than each item in lengths. + * @param data An array of data buffers to push. The number of buffers in the array must be no less than `nbufs`. + * @param bytes An array comprising the number of bytes in each buffer. The number of entries in bytes must be no less than `nbufs`. + * @param nbufs The number of values pointed to in `data` and equivalently the number of items in `bytes`. */ -extern LIBLSL_C_API int32_t lsl_push_sample_buftpn(lsl_outlet out, const char **data, const uint32_t *lengths, double timestamp, int32_t pushthrough, uint32_t nbufs); +extern LIBLSL_C_API int32_t lsl_push_sample_rawtpn(lsl_outlet out, void **data, const uint32_t *bytes, double timestamp, int32_t pushthrough, uint32_t nbufs); /** Push a chunk of multiplexed samples into the outlet. One timestamp per sample is provided. * diff --git a/include/lsl_cpp.h b/include/lsl_cpp.h index faf583cf..11e490d8 100644 --- a/include/lsl_cpp.h +++ b/include/lsl_cpp.h @@ -522,8 +522,8 @@ class stream_outlet { * @param pushthrough Whether to push the sample through to the receivers immediately instead of * concatenating with subsequent samples. */ - void push_numeric_bufs(const char **bufs, uint32_t *bytes, uint32_t nbufs, double timestamp = 0.0, bool pushthrough = true) { - lsl_push_sample_buftpn(obj.get(), bufs, bytes, timestamp, pushthrough, nbufs); + void push_numeric_bufs(void **bufs, uint32_t *bytes, uint32_t nbufs, double timestamp = 0.0, bool pushthrough = true) { + lsl_push_sample_rawtpn(obj.get(), bufs, bytes, timestamp, pushthrough, nbufs); } diff --git a/src/lsl_outlet_c.cpp b/src/lsl_outlet_c.cpp index 45621470..819f2c54 100644 --- a/src/lsl_outlet_c.cpp +++ b/src/lsl_outlet_c.cpp @@ -172,26 +172,59 @@ LIBLSL_C_API int32_t lsl_push_sample_buft( LIBLSL_C_API int32_t lsl_push_sample_buftp(lsl_outlet out, const char **data, const uint32_t *lengths, double timestamp, int32_t pushthrough) { stream_outlet_impl *outimpl = out; - return lsl_push_sample_buftpn(out, data, lengths, timestamp, pushthrough, + // As the number of bytes-per-buffer is the same as the number of chars-per-buffer, + // we can pass `lengths` through as `bytes`. + return lsl_push_sample_rawtpn(out, (void **)data, lengths, timestamp, pushthrough, (uint32_t)outimpl->info().channel_count()); } -LIBLSL_C_API int32_t lsl_push_sample_buftpn(lsl_outlet out, const char **data, - const uint32_t *lengths, double timestamp, int32_t pushthrough, uint32_t nbufs) { +LIBLSL_C_API int32_t lsl_push_sample_rawtpn(lsl_outlet out, void **data, + const uint32_t *bytes, double timestamp, int32_t pushthrough, uint32_t nbufs) { stream_outlet_impl *outimpl = out; try { if (outimpl->is_sync_blocking()) { - // Convert input to a vector of asio buffers - std::vector buffs; + // Convert input to a vector of asio buffers for a gather-write operation. + std::vector bufs; + bufs.reserve(nbufs); for (auto buf_ix = 0; buf_ix < nbufs; buf_ix++) { - buffs.push_back(asio::buffer(data[buf_ix], lengths[buf_ix])); + bufs.push_back(asio::buffer(data[buf_ix], bytes[buf_ix])); } - return outimpl->push_sample_gather(buffs, timestamp, pushthrough); + return outimpl->push_sample_gather(bufs, timestamp, pushthrough); } else { - std::vector tmp; - for (uint32_t k = 0; k < nbufs; k++) - tmp.emplace_back(data[k], lengths[k]); - return outimpl->push_sample_noexcept(&tmp[0], timestamp, pushthrough); + // Make contiguous. + if (outimpl->info().channel_format() == cft_string) { + // For strings we place in std::string vector to make sure they are properly + // terminated. + std::vector tmp; + for (uint32_t k = 0; k < nbufs; k++) + tmp.emplace_back((const char *)data[k], bytes[k]); + return outimpl->push_sample_noexcept(&tmp[0], timestamp, pushthrough); + } else { + // Otherwise we put into new memory block. + uint32_t total_bytes = 0, byte_offset = 0; + for (size_t k = 0; k < nbufs; k++) { + total_bytes += bytes[k]; + } + char * tmp = (char *)malloc(total_bytes); + for (size_t k = 0; k < nbufs; k++) { + memcpy(&tmp[byte_offset], data[k], bytes[k]); + byte_offset += bytes[k]; + } + // TODO: I tried passing void buffer but eventually fail because the convert functions + // become ambiguous. + lsl_error_code_t ec; + switch(outimpl->info().channel_format()) { + case cft_int8: ec = outimpl->push_sample_noexcept((const char *)tmp, timestamp, pushthrough); + case cft_int16: ec = outimpl->push_sample_noexcept((const int16_t *)tmp, timestamp, pushthrough); + case cft_int32: ec = outimpl->push_sample_noexcept((const int32_t *)tmp, timestamp, pushthrough); + case cft_int64: ec = outimpl->push_sample_noexcept((const int64_t *)tmp, timestamp, pushthrough); + case cft_float32: ec = outimpl->push_sample_noexcept((const float *)tmp, timestamp, pushthrough); + case cft_double64: ec = outimpl->push_sample_noexcept((const double *)tmp, timestamp, pushthrough); + case cft_undefined: ec = lsl_internal_error; + } + free(tmp); + return ec; + } } } catch (std::exception &e) { LOG_F(WARNING, "Unexpected error during push_sample: %s", e.what()); diff --git a/src/stream_outlet_impl.cpp b/src/stream_outlet_impl.cpp index 149898c2..00b6cd6e 100644 --- a/src/stream_outlet_impl.cpp +++ b/src/stream_outlet_impl.cpp @@ -23,10 +23,8 @@ stream_outlet_impl::stream_outlet_impl( send_buffer_(std::make_shared(max_capacity)), do_sync_(flags & transp_sync_blocking) { - if ((info.channel_format() == cft_string) && do_sync_) { - LOG_F(WARNING, "sync push not supported for string-formatted streams. Reverting to async."); - do_sync_ = false; - } + if ((info.channel_format() == cft_string) && do_sync_) + throw std::invalid_argument("Synchronous push not supported for string-formatted streams."); ensure_lsl_initialized(); const api_config *cfg = api_config::get_instance(); @@ -164,7 +162,7 @@ bool stream_outlet_impl::wait_for_consumers(double timeout) { return send_buffer_->wait_for_consumers(timeout); } -void stream_outlet_impl::push_timestamp_sync(double timestamp) { +void stream_outlet_impl::push_timestamp_sync(const double& timestamp) { if (timestamp == DEDUCED_TIMESTAMP) { sync_buffs_.emplace_back(asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1)); } else { @@ -180,7 +178,7 @@ void stream_outlet_impl::pushthrough_sync() { sync_buffs_.clear(); } -void stream_outlet_impl::enqueue_sync(asio::const_buffer buff, double timestamp, bool pushthrough) { +void stream_outlet_impl::enqueue_sync(asio::const_buffer buff, const double& timestamp, bool pushthrough) { push_timestamp_sync(timestamp); sync_buffs_.push_back(buff); if (pushthrough) pushthrough_sync(); @@ -195,7 +193,7 @@ void stream_outlet_impl::enqueue(const T *data, double timestamp, bool pushthrou smp->assign_typed(data); send_buffer_->push_sample(smp); } else { - enqueue_sync(asio::buffer(data, smp->datasize()), timestamp, pushthrough); + enqueue_sync(asio::buffer(data, smp->datasize()), smp->timestamp, smp->pushthrough); } } diff --git a/src/stream_outlet_impl.h b/src/stream_outlet_impl.h index c92a3443..58d5e73d 100644 --- a/src/stream_outlet_impl.h +++ b/src/stream_outlet_impl.h @@ -309,13 +309,13 @@ class stream_outlet_impl { template void enqueue(const T *data, double timestamp, bool pushthrough); /// Append the appropriate timestamp tag and optionally timestamp onto sync_buffs_ for a single timestamp. - void push_timestamp_sync(double timestamp); + void push_timestamp_sync(const double& timestamp); /// push sync_buffs_ through each tcp server. void pushthrough_sync(); /// Append a single timestamp and single buffer to sync_buffs and optionally pushthrough the server. - void enqueue_sync(asio::const_buffer buff, double timestamp, bool pushthrough); + void enqueue_sync(asio::const_buffer buff, const double ×tamp, bool pushthrough); /** * Append a single timestamp and multiple within-sample buffers to sync_buffs_. diff --git a/src/tcp_server.cpp b/src/tcp_server.cpp index 83b7b326..90bdcdf6 100644 --- a/src/tcp_server.cpp +++ b/src/tcp_server.cpp @@ -224,24 +224,24 @@ void tcp_server::handle_accept_outcome(std::shared_ptr newsessio // === synchronous transfer -void tcp_server::write_all_blocking(std::vector buffs) { +void tcp_server::write_all_blocking(std::vector bufs) { std::lock_guard lock(inflight_mut_); - std::size_t bytes_sent = 0; - for (const auto &x : inflight_ready_) { - if (x.second && x.first->is_open()) { + // If there is exactly one consumer, use blocking-write. + if (inflight_ready_.size() == 1) { + if (inflight_ready_.begin()->second && inflight_ready_.begin()->first->is_open()) { try { - // I couldn't figure out how to get the correct overload while providing - // error_code& ec to the write function. So we use try-catch instead. - bytes_sent = asio::write(*x.first, buffs); + // CBB: I couldn't figure out how to get the correct overload while providing + // error_code& ec to the write function; using try-catch instead. + asio::write(*inflight_ready_.begin()->first, bufs); } catch (const asio::system_error &err) { // std::exception &e asio::error_code ec = err.code(); switch(ec.value()) { case asio::error::broken_pipe: case asio::error::connection_reset: LOG_F(WARNING, "Broken Pipe / Connection Reset detected. Closing socket."); - inflight_ready_[x.first] = false; - post(*io_, [x]() { - close_inflight_socket(x); + inflight_ready_.begin()->second = false; + post(*io_, [sock=inflight_ready_.begin()->first]() { + close_inflight_socket(sock); }); // We leave it up to the client_session destructor to remove the socket. break; @@ -251,6 +251,43 @@ void tcp_server::write_all_blocking(std::vector buffs) { } } } + else { // multiple consumers. Use asynchronous write. + for (const auto &x : inflight_ready_) { + if (x.second && x.first->is_open()) { + asio::async_write(*x.first, bufs, [this, self = shared_from_this()]( + const asio::error_code& ec, size_t bytes_transferred) { + { + std::lock_guard lock(sync_write_mut_); + // assign the transfer outcome + if (!ec) { + // TODO: Check bytes_transferred + } + else { + switch(ec.value()) { + case asio::error::broken_pipe: + case asio::error::connection_reset: + LOG_F(WARNING, "Broken Pipe / Connection Reset detected. Closing socket."); + inflight_ready_.begin()->second = false; + post(*io_, [sock=inflight_ready_.begin()->first]() { + close_inflight_socket(sock); + }); + // We leave it up to the client_session destructor to remove the socket. + break; + default: + LOG_F(WARNING, "Unhandled write_all_blocking error: %s.", ec.message().c_str()); + } + } + } + sync_write_cv_.notify_one(); + }); + } + } + // Using asynchronous write, but waiting for return. + { + std::unique_lock lk(sync_write_mut_); + sync_write_cv_.wait(lk, []{return true;}); + } + } } // === graceful cancellation of in-flight sockets === @@ -266,14 +303,14 @@ void tcp_server::unregister_inflight_socket(const tcp_socket_p &sock) { inflight_ready_.erase(sock); } -void tcp_server::close_inflight_socket(std::pair x) { +void tcp_server::close_inflight_socket(const tcp_socket_p &sock) { try { - if (x.first->is_open()) { + if (sock->is_open()) { try { // (in some cases shutdown may fail) - x.first->shutdown(x.first->shutdown_both); + sock->shutdown(sock->shutdown_both); } catch (...) {} - x.first->close(); + sock->close(); } } catch (std::exception &e) { LOG_F(WARNING, "Error during shutdown_and_close: %s", e.what()); @@ -284,7 +321,7 @@ void tcp_server::close_inflight_sockets() { std::lock_guard lock(inflight_mut_); for (const auto &x : inflight_ready_) { inflight_ready_[x.first] = false; - post(*io_, [x]() { close_inflight_socket(x); }); + post(*io_, [sock=x.first]() { close_inflight_socket(sock); }); } } diff --git a/src/tcp_server.h b/src/tcp_server.h index f8216a86..7e0b01bf 100644 --- a/src/tcp_server.h +++ b/src/tcp_server.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -65,15 +66,15 @@ class tcp_server : public std::enable_shared_from_this { * Initiate teardown of IO processes. * * The actual teardown will be performed by the IO thread that runs the operations of - * thisserver. + * this server. */ void end_serving(); /** * Write directly to each socket. This should only be used when server initialized with - * do_async = false. + * do_sync = true. */ - void write_all_blocking(std::vector buffs); + void write_all_blocking(std::vector bufs); private: friend class client_session; @@ -91,7 +92,7 @@ class tcp_server : public std::enable_shared_from_this { void unregister_inflight_socket(const tcp_socket_p &sock); /// Post a close of a single in-flight socket - static void close_inflight_socket(std::pair x); + static void close_inflight_socket(const tcp_socket_p &sock); /// Post a close of all in-flight sockets. void close_inflight_sockets(); @@ -117,6 +118,8 @@ class tcp_server : public std::enable_shared_from_this { // registry of in-flight client sockets (for cancellation) std::map inflight_ready_; // registry of currently in-flight sockets std::recursive_mutex inflight_mut_; // mutex protecting the registry from concurrent access + std::mutex sync_write_mut_; + std::condition_variable sync_write_cv_; // some cached data std::string shortinfo_msg_; // pre-computed short-info server response From 9c04b94ea594aca73cc65edd8693ca6ac00928ba Mon Sep 17 00:00:00 2001 From: Chadwick Boulay Date: Fri, 5 Nov 2021 11:54:09 -0400 Subject: [PATCH 5/5] Replace const int for array size with macro to make Win C99 happy. --- examples/ReceiveDataC.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/ReceiveDataC.c b/examples/ReceiveDataC.c index cc5ef39e..ceaaf912 100644 --- a/examples/ReceiveDataC.c +++ b/examples/ReceiveDataC.c @@ -5,7 +5,7 @@ * Example program that demonstrates how to resolve a specific stream on the lab network and how to * connect to it in order to receive data. */ -const int NCHANS = 8; +#define NCHANS 8 int main(int argc, char *argv[]) {