Skip to content

Commit 5f08533

Browse files
feat(rtbot): data and control queues max amount enforced
1 parent 72f551a commit 5f08533

File tree

10 files changed

+197
-17
lines changed

10 files changed

+197
-17
lines changed

libs/core/include/rtbot/Demultiplexer.h

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,16 +73,24 @@ class Demultiplexer : public Operator {
7373
Operator::reset();
7474
data_time_tracker_.clear();
7575
control_time_tracker_.clear();
76+
for (size_t i = 0; i < get_num_ports(); ++i) {
77+
control_time_tracker_[i] = std::map<timestamp_t, bool>();
78+
}
7679
}
7780

78-
void receive_data(std::unique_ptr<BaseMessage> msg, size_t port_index) override {
81+
timestamp_t receive_data(std::unique_ptr<BaseMessage> msg, size_t port_index) override {
7982
auto time = msg->time;
80-
Operator::receive_data(std::move(msg), port_index);
83+
timestamp_t time_dequeued = Operator::receive_data(std::move(msg), port_index);
8184

8285
data_time_tracker_.insert(time);
86+
if (time_dequeued >= 0) {
87+
data_time_tracker_.erase(time_dequeued);
88+
}
89+
90+
return time_dequeued;
8391
}
8492

85-
void receive_control(std::unique_ptr<BaseMessage> msg, size_t port_index) override {
93+
timestamp_t receive_control(std::unique_ptr<BaseMessage> msg, size_t port_index) override {
8694
if (port_index >= num_control_ports()) {
8795
throw std::runtime_error("Invalid control port index");
8896
}
@@ -92,12 +100,27 @@ class Demultiplexer : public Operator {
92100
throw std::runtime_error("Invalid control message type");
93101
}
94102

95-
// Update control tracker
103+
timestamp_t time_dequeued = -1;
104+
105+
if (get_control_queue(port_index).size() == max_size_per_port_) {
106+
time_dequeued = get_control_queue(port_index).front()->time;
107+
get_control_queue(port_index).pop_front();
108+
}
109+
96110
control_time_tracker_[port_index][ctrl_msg->time] = ctrl_msg->data.value;
97111

112+
if (time_dequeued >= 0) {
113+
auto it = control_time_tracker_.find(port_index);
114+
if (it != control_time_tracker_.end()) {
115+
it->second.erase(time_dequeued);
116+
}
117+
}
118+
98119
// Add message to queue
99120
get_control_queue(port_index).push_back(std::move(msg));
100121
control_ports_with_new_data_.insert(port_index);
122+
123+
return time_dequeued;
101124
}
102125

103126
protected:

libs/core/include/rtbot/Input.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,12 @@ class Input : public Operator {
9696
}
9797

9898
// Do not throw exceptions in receive_data
99-
void receive_data(std::unique_ptr<BaseMessage> msg, size_t port_index) override {
99+
timestamp_t receive_data(std::unique_ptr<BaseMessage> msg, size_t port_index) override {
100100
try {
101-
Operator::receive_data(std::move(msg), port_index);
101+
timestamp_t time_dequeued = Operator::receive_data(std::move(msg), port_index);
102+
return time_dequeued;
102103
} catch (const std::exception& e) {
103-
// Do nothing
104+
return -1;
104105
}
105106
}
106107

libs/core/include/rtbot/Join.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,12 +170,18 @@ class Join : public Operator {
170170
synchronized_data.clear();
171171
}
172172

173-
void receive_data(std::unique_ptr<BaseMessage> msg, size_t port_index) override {
173+
timestamp_t receive_data(std::unique_ptr<BaseMessage> msg, size_t port_index) override {
174174
auto time = msg->time;
175-
Operator::receive_data(std::move(msg), port_index);
175+
timestamp_t time_dequeued = Operator::receive_data(std::move(msg), port_index);
176176

177177
// Track timestamp
178178
data_time_tracker_[port_index].insert(time);
179+
180+
if (time_dequeued >= 0) {
181+
data_time_tracker_[port_index].erase(time_dequeued);
182+
}
183+
184+
return time_dequeued;
179185
}
180186

181187
protected:

libs/core/include/rtbot/Multiplexer.h

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,17 @@ class Multiplexer : public Operator {
4242
Operator::reset();
4343
data_time_tracker_.clear();
4444
control_time_tracker_.clear();
45+
46+
for (size_t i = 0; i < get_num_ports(); ++i) {
47+
data_time_tracker_[i] = std::set<timestamp_t>();
48+
}
49+
50+
for (size_t i = 0; i < get_num_ports(); ++i) {
51+
control_time_tracker_[i] = std::map<timestamp_t, bool>();
52+
}
4553
}
4654

47-
size_t get_num_ports() const { return data_time_tracker_.size(); }
55+
size_t get_num_ports() const { return data_ports_.size(); }
4856

4957
std::string type_name() const override { return "Multiplexer"; }
5058

@@ -79,15 +87,19 @@ class Multiplexer : public Operator {
7987
StateSerializer::validate_port_count(control_time_tracker_.size(), num_control_ports(), "Control");
8088
}
8189

82-
void receive_data(std::unique_ptr<BaseMessage> msg, size_t port_index) override {
90+
timestamp_t receive_data(std::unique_ptr<BaseMessage> msg, size_t port_index) override {
8391
auto time = msg->time;
84-
Operator::receive_data(std::move(msg), port_index);
92+
timestamp_t time_dequeued = Operator::receive_data(std::move(msg), port_index);
8593

86-
// Add timestamp to the tracker
8794
data_time_tracker_[port_index].insert(time);
95+
if (time_dequeued >= 0) {
96+
data_time_tracker_[port_index].erase(time_dequeued);
97+
}
98+
99+
return time_dequeued;
88100
}
89101

90-
void receive_control(std::unique_ptr<BaseMessage> msg, size_t port_index) override {
102+
timestamp_t receive_control(std::unique_ptr<BaseMessage> msg, size_t port_index) override {
91103
if (port_index >= num_control_ports()) {
92104
throw std::runtime_error("Invalid control port index");
93105
}
@@ -97,12 +109,28 @@ class Multiplexer : public Operator {
97109
throw std::runtime_error("Invalid control message type");
98110
}
99111

112+
timestamp_t time_dequeued = -1;
113+
114+
if (get_control_queue(port_index).size() == max_size_per_port_) {
115+
time_dequeued = get_control_queue(port_index).front()->time;
116+
get_control_queue(port_index).pop_front();
117+
}
118+
100119
// Update control time tracker
101120
control_time_tracker_[port_index][ctrl_msg->time] = ctrl_msg->data.value;
102121

122+
if (time_dequeued >= 0) {
123+
auto it = control_time_tracker_.find(port_index);
124+
if (it != control_time_tracker_.end()) {
125+
it->second.erase(time_dequeued);
126+
}
127+
}
128+
103129
// Add message to queue
104130
get_control_queue(port_index).push_back(std::move(msg));
105131
data_ports_with_new_data_.insert(port_index);
132+
133+
return time_dequeued;
106134
}
107135

108136
protected:

libs/core/include/rtbot/Operator.h

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ enum class PortKind { DATA, CONTROL };
3535
// Base operator class
3636
class Operator {
3737
public:
38-
Operator(std::string id) : id_(std::move(id)) {}
38+
Operator(std::string id) : id_(std::move(id)), max_size_per_port_(17280) {}
3939
virtual ~Operator() = default;
4040

4141
virtual std::string type_name() const = 0;
@@ -156,9 +156,10 @@ class Operator {
156156
size_t num_data_ports() const { return data_ports_.size(); }
157157
size_t num_control_ports() const { return control_ports_.size(); }
158158
size_t num_output_ports() const { return output_ports_.size(); }
159+
size_t max_size_per_port() const { return max_size_per_port_; }
159160

160161
// Runtime port access for data with type checking
161-
virtual void receive_data(std::unique_ptr<BaseMessage> msg, size_t port_index) {
162+
virtual timestamp_t receive_data(std::unique_ptr<BaseMessage> msg, size_t port_index) {
162163
if (port_index >= data_ports_.size()) {
163164
throw std::runtime_error("Invalid data port index at " + type_name() + "(" + id_ + ")" + ":" +
164165
std::to_string(port_index));
@@ -183,8 +184,17 @@ class Operator {
183184
RTBOT_RECORD_MESSAGE(id_, type_name(), std::move(msg->clone()));
184185
#endif
185186

187+
timestamp_t time_dequeued = -1;
188+
189+
if (data_ports_[port_index].queue.size() == max_size_per_port_) {
190+
time_dequeued = data_ports_[port_index].queue.front()->time;
191+
data_ports_[port_index].queue.pop_front();
192+
}
193+
186194
data_ports_[port_index].queue.push_back(std::move(msg));
187195
data_ports_with_new_data_.insert(port_index);
196+
197+
return time_dequeued;
188198
}
189199

190200
virtual void reset() {
@@ -247,7 +257,7 @@ class Operator {
247257
}
248258

249259
// Runtime port access for control messages with type checking
250-
virtual void receive_control(std::unique_ptr<BaseMessage> msg, size_t port_index) {
260+
virtual timestamp_t receive_control(std::unique_ptr<BaseMessage> msg, size_t port_index) {
251261
if (port_index >= control_ports_.size()) {
252262
throw std::runtime_error("Invalid control port index at " + type_name() + "(" + id_ + ")" + ":" +
253263
std::to_string(port_index));
@@ -269,8 +279,17 @@ class Operator {
269279
// Update last timestamp
270280
control_ports_[port_index].last_timestamp = msg->time;
271281

282+
timestamp_t time_dequeued = -1;
283+
284+
if (control_ports_[port_index].queue.size() == max_size_per_port_) {
285+
time_dequeued = control_ports_[port_index].queue.front()->time;
286+
control_ports_[port_index].queue.pop_front();
287+
}
288+
272289
control_ports_[port_index].queue.push_back(std::move(msg));
273290
control_ports_with_new_data_.insert(port_index);
291+
292+
return time_dequeued;
274293
}
275294

276295
std::shared_ptr<Operator> connect(std::shared_ptr<Operator> child, size_t output_port = 0,
@@ -417,6 +436,7 @@ class Operator {
417436
std::vector<Connection> connections_;
418437
std::set<size_t> data_ports_with_new_data_;
419438
std::set<size_t> control_ports_with_new_data_;
439+
std::size_t max_size_per_port_;
420440
};
421441

422442
} // namespace rtbot

libs/core/test/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,5 @@ cc_test(
1818
],
1919
"//conditions:default": [],
2020
}),
21+
timeout = "eternal", # or "long"
2122
)

libs/core/test/test_demultiplexer.cpp

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,32 @@ SCENARIO("Demultiplexer routes messages based on control signals", "[demultiplex
4747
REQUIRE(second_output.empty());
4848
}
4949
}
50+
51+
WHEN("Multiple control ports are active and messages exceeds the max_size_per_port()") {
52+
53+
for (int i = 0; i < demux->max_size_per_port() + 2; i++) {
54+
demux->receive_control(create_message<BooleanData>(i, BooleanData{true}), 0);
55+
demux->receive_control(create_message<BooleanData>(i, BooleanData{true}), 1);
56+
demux->receive_data(create_message<NumberData>(i, NumberData{i * 2.0}), 0);
57+
}
58+
demux->execute();
59+
60+
THEN("Message is routed to both ports") {
61+
const auto& first_output = demux->get_output_queue(0);
62+
const auto& second_output = demux->get_output_queue(1);
63+
64+
REQUIRE(first_output.size() == demux->max_size_per_port());
65+
REQUIRE(second_output.size() == demux->max_size_per_port());
66+
67+
auto* msg1 = dynamic_cast<const Message<NumberData>*>(first_output[0].get());
68+
auto* msg2 = dynamic_cast<const Message<NumberData>*>(second_output[0].get());
69+
70+
REQUIRE(msg1->time == 2);
71+
REQUIRE(msg1->data.value == 4.0);
72+
REQUIRE(msg2->time == 2);
73+
REQUIRE(msg2->data.value == 4.0);
74+
}
75+
}
5076
}
5177
}
5278

libs/core/test/test_input.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,21 @@ SCENARIO("Input operator handles single number port", "[input]") {
2727
}
2828
}
2929

30+
WHEN("Receiving a max_size_per_port() + 1 messages, only max_size_per_port() are forwarded") {
31+
for (int i = 0; i < input->max_size_per_port() + 1; i++) {
32+
input->receive_data(create_message<NumberData>(i, NumberData{i * 2.0}), 0);
33+
}
34+
input->execute();
35+
36+
THEN("only 11000 are forwarded") {
37+
const auto& output = input->get_output_queue(0);
38+
REQUIRE(output.size() == input->max_size_per_port());
39+
const auto* msg = dynamic_cast<const Message<NumberData>*>(output.front().get());
40+
REQUIRE(msg->time == 1);
41+
REQUIRE(msg->data.value == 2.0);
42+
}
43+
}
44+
3045
WHEN("Receiving messages with decreasing timestamps") {
3146
input->receive_data(create_message<NumberData>(2, NumberData{42.0}), 0);
3247
input->receive_data(create_message<NumberData>(1, NumberData{24.0}), 0);

libs/core/test/test_join.cpp

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,45 @@ SCENARIO("Join operator handles basic synchronization", "[join]") {
6464
REQUIRE(msg1->data.value == 24.0);
6565
}
6666
}
67+
68+
WHEN("Receiving messages above the limit max_size_per_port()") {
69+
70+
for (int i = 0; i < join->max_size_per_port() + 2; i++) {
71+
join->receive_data(create_message<NumberData>(i, NumberData{2.0 * i}), 1);
72+
join->receive_data(create_message<NumberData>(i, NumberData{3.0 * i}), 0);
73+
}
74+
75+
join->execute();
76+
77+
THEN("max_size_per_port() messages are synchronized correctly, first and second message are dropped") {
78+
auto& output0 = join->get_output_queue(0);
79+
auto& output1 = join->get_output_queue(1);
80+
81+
const auto& input0 = join->get_data_queue(0);
82+
const auto& input1 = join->get_data_queue(1);
83+
84+
85+
REQUIRE(output0.size() == join->max_size_per_port());
86+
REQUIRE(output1.size() == join->max_size_per_port());
87+
88+
REQUIRE(input0.size() == 0);
89+
REQUIRE(input1.size() == 0);
90+
91+
const Message<NumberData>* msg0;
92+
const Message<NumberData>* msg1;
93+
94+
for (int i = 0; i < join->max_size_per_port(); i++) {
95+
msg0 = dynamic_cast<const Message<NumberData>*>(output0.front().get());
96+
msg1 = dynamic_cast<const Message<NumberData>*>(output1.front().get());
97+
REQUIRE(msg0->time == i+2);
98+
REQUIRE(msg0->data.value == (i+2.0) * 3);
99+
REQUIRE(msg1->time == i+2);
100+
REQUIRE(msg1->data.value == (i+2.0) * 2);
101+
output0.pop_front();
102+
output1.pop_front();
103+
}
104+
}
105+
}
67106
}
68107
}
69108

libs/core/test/test_multiplexer.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,27 @@ SCENARIO("Multiplexer routes messages based on control signals", "[multiplexer]"
7676
}
7777
}
7878

79+
WHEN("Receiving multiple messages in sequence and exceeds max_size_per_port()") {
80+
81+
for (int i = 0; i < mult->max_size_per_port() + 5; i ++) {
82+
mult->receive_control(create_message<BooleanData>(i, BooleanData{i % 2 == 0}), 0);
83+
mult->receive_control(create_message<BooleanData>(i, BooleanData{i % 2 == 1}), 1);
84+
mult->receive_data(create_message<NumberData>(i, NumberData{i * 2.0}), 0);
85+
mult->receive_data(create_message<NumberData>(i, NumberData{i * 3.0}), 1);
86+
}
87+
mult->execute();
88+
89+
THEN("It forwards data from the correct ports in sequence, it drops 5 messages") {
90+
const auto& output = mult->get_output_queue(0);
91+
REQUIRE(output.size() == mult->max_size_per_port());
92+
93+
auto* msg1 = dynamic_cast<const Message<NumberData>*>(output[0].get());
94+
REQUIRE(msg1 != nullptr);
95+
REQUIRE(msg1->time == 5);
96+
REQUIRE(msg1->data.value == 15.0);
97+
}
98+
}
99+
79100
WHEN("Receiving control signals with no active port") {
80101
mult->receive_control(create_message<BooleanData>(1, BooleanData{false}), 0);
81102
mult->receive_control(create_message<BooleanData>(1, BooleanData{false}), 1);

0 commit comments

Comments
 (0)