Skip to content

Commit 0f5c4d9

Browse files
tstennercboulay
authored andcommitted
Reduce enqueue overhead
1 parent 2b6813a commit 0f5c4d9

File tree

3 files changed

+20
-12
lines changed

3 files changed

+20
-12
lines changed

src/sample.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ class factory {
5454
/// Reclaim a sample that's no longer used.
5555
void reclaim_sample(sample *s);
5656

57+
std::size_t datasize() const { return format_sizes[fmt_] * static_cast<std::size_t>(num_chans_); }
58+
5759
private:
5860
/// Pop a sample from the freelist (multi-producer/single-consumer queue by Dmitry Vjukov)
5961
sample *pop_freelist();

src/stream_outlet_impl.cpp

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -168,38 +168,43 @@ bool stream_outlet_impl::wait_for_consumers(double timeout) {
168168
return send_buffer_->wait_for_consumers(timeout);
169169
}
170170

171-
void stream_outlet_impl::push_timestamp_sync(const double &timestamp) {
171+
void stream_outlet_impl::push_timestamp_sync(double timestamp) {
172+
static_assert(TAG_TRANSMITTED_TIMESTAMP == 2, "Unexpected TAG_TRANSMITTED_TIMESTAMP");
173+
const uint64_t ENDIAN_SAFE_TAG_TRANSMITTED = (2LL << 28) | 2LL;
172174
if (timestamp == DEDUCED_TIMESTAMP) {
173-
sync_buffs_.emplace_back(asio::buffer(&TAG_DEDUCED_TIMESTAMP, 1));
175+
sync_buffs_.emplace_back(&TAG_DEDUCED_TIMESTAMP, 1);
174176
} else {
175-
sync_buffs_.emplace_back(asio::buffer(&TAG_TRANSMITTED_TIMESTAMP, 1));
176-
sync_buffs_.emplace_back(asio::buffer(&timestamp, sizeof(timestamp)));
177+
sync_timestamps_.emplace_back(ENDIAN_SAFE_TAG_TRANSMITTED, timestamp);
178+
// add a pointer to the memory region containing |TAG_TRANSMITTED_TIMESTAMP|timestamp
179+
// one byte for the tag, 8 for the timestamp
180+
sync_buffs_.emplace_back(reinterpret_cast<const char*>(&sync_timestamps_.back()) + 7, 9);
177181
}
178182
}
179183

180184
void stream_outlet_impl::pushthrough_sync() {
181185
// LOG_F(INFO, "Pushing %u buffers.", sync_buffs_.size());
182186
tcp_server_->write_all_blocking(sync_buffs_);
183187
sync_buffs_.clear();
188+
sync_timestamps_.clear();
184189
}
185190

186191
void stream_outlet_impl::enqueue_sync(
187-
asio::const_buffer buff, const double &timestamp, bool pushthrough) {
192+
asio::const_buffer buff, double timestamp, bool pushthrough) {
188193
push_timestamp_sync(timestamp);
189194
sync_buffs_.push_back(buff);
190195
if (pushthrough) pushthrough_sync();
191196
}
192197

193198
template <class T>
194199
void stream_outlet_impl::enqueue(const T *data, double timestamp, bool pushthrough) {
195-
if (lsl::api_config::get_instance()->force_default_timestamps()) timestamp = 0.0;
196-
sample_p smp(
197-
sample_factory_->new_sample(timestamp == 0.0 ? lsl_clock() : timestamp, pushthrough));
200+
if (timestamp == 0.0 || lsl::api_config::get_instance()->force_default_timestamps()) timestamp = lsl_local_clock();
198201
if (!do_sync_) {
202+
sample_p smp(
203+
sample_factory_->new_sample(timestamp, pushthrough));
199204
smp->assign_typed(data);
200205
send_buffer_->push_sample(smp);
201206
} else {
202-
enqueue_sync(asio::buffer(data, smp->datasize()), smp->timestamp, smp->pushthrough);
207+
enqueue_sync(asio::buffer(data, sample_factory_->datasize()), timestamp, pushthrough);
203208
}
204209
}
205210

src/stream_outlet_impl.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,6 @@ class stream_outlet_impl {
256256
throw std::runtime_error("The number of buffer elements to send is not a multiple of "
257257
"the stream's channel count.");
258258
if (num_samples > 0) {
259-
if (timestamp == 0.0) timestamp = lsl_clock();
260259
if (info().nominal_srate() != IRREGULAR_RATE)
261260
timestamp = timestamp - (num_samples - 1) / info().nominal_srate();
262261
push_sample(buffer, timestamp, pushthrough && (num_samples == 1));
@@ -311,14 +310,14 @@ class stream_outlet_impl {
311310

312311
/// Append the appropriate timestamp tag and optionally timestamp onto sync_buffs_ for a single
313312
/// timestamp.
314-
void push_timestamp_sync(const double &timestamp);
313+
void push_timestamp_sync(double timestamp);
315314

316315
/// push sync_buffs_ through each tcp server.
317316
void pushthrough_sync();
318317

319318
/// Append a single timestamp and single buffer to sync_buffs and optionally pushthrough the
320319
/// server.
321-
void enqueue_sync(asio::const_buffer buff, const double &timestamp, bool pushthrough);
320+
void enqueue_sync(asio::const_buffer buff, double timestamp, bool pushthrough);
322321

323322
/**
324323
* Append a single timestamp and multiple within-sample buffers to sync_buffs_.
@@ -362,6 +361,8 @@ class stream_outlet_impl {
362361
std::vector<thread_p> io_threads_;
363362
/// buffers used in synchronous call to gather-write data directly to the socket.
364363
std::vector<asio::const_buffer> sync_buffs_;
364+
/// timestamp buffer for sync transfers
365+
std::vector<std::pair<uint64_t, double>> sync_timestamps_;
365366
};
366367

367368
} // namespace lsl

0 commit comments

Comments
 (0)