Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 45 additions & 116 deletions libs/core/include/rtbot/Demultiplexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@ class Demultiplexer : public Operator {
}

// Add single data input port with type T
add_data_port<T>();
data_time_tracker_ = std::set<timestamp_t>();
add_data_port<T>();

// Add corresponding control ports (always boolean)
for (size_t i = 0; i < num_ports; ++i) {
add_control_port<BooleanData>();
control_time_tracker_[i] = std::map<timestamp_t, bool>();
add_control_port<BooleanData>();
}

// Add output ports (same type as input)
Expand All @@ -37,50 +35,7 @@ class Demultiplexer : public Operator {

std::string type_name() const override { return "Demultiplexer"; }

size_t get_num_ports() const { return control_time_tracker_.size(); }

Bytes collect() override {
Bytes bytes = Operator::collect(); // First collect base state

// Serialize data time tracker
StateSerializer::serialize_timestamp_set(bytes, data_time_tracker_);

// Serialize control time tracker
StateSerializer::serialize_port_control_map(bytes, control_time_tracker_);

return bytes;
}

void restore(Bytes::const_iterator& it) override {
// First restore base state
Operator::restore(it);

// Clear current state
data_time_tracker_.clear();
control_time_tracker_.clear();

// Restore data time tracker
StateSerializer::deserialize_timestamp_set(it, data_time_tracker_);

// Restore control time tracker
StateSerializer::deserialize_port_control_map(it, control_time_tracker_);

// Validate control port count
StateSerializer::validate_port_count(control_time_tracker_.size(), num_control_ports(), "Control");
}

void reset() override {
Operator::reset();
data_time_tracker_.clear();
control_time_tracker_.clear();
}

void receive_data(std::unique_ptr<BaseMessage> msg, size_t port_index) override {
auto time = msg->time;
Operator::receive_data(std::move(msg), port_index);

data_time_tracker_.insert(time);
}
size_t get_num_ports() const { return num_control_ports(); }

void receive_control(std::unique_ptr<BaseMessage> msg, size_t port_index) override {
if (port_index >= num_control_ports()) {
Expand All @@ -92,88 +47,62 @@ class Demultiplexer : public Operator {
throw std::runtime_error("Invalid control message type");
}

// Update control tracker
control_time_tracker_[port_index][ctrl_msg->time] = ctrl_msg->data.value;
// Update last timestamp
control_ports_[port_index].last_timestamp = msg->time;

if (get_control_queue(port_index).size() == max_size_per_port_) {
get_control_queue(port_index).pop_front();
}

// Add message to queue
get_control_queue(port_index).push_back(std::move(msg));
control_ports_with_new_data_.insert(port_index);
}

protected:
void process_data() override {
while (true) {
// Find oldest common control timestamp
auto common_control_time = TimestampTracker::find_oldest_common_time(control_time_tracker_);
if (!common_control_time) {
break;
}
protected:

// Clean up any old input data messages
auto& data_queue = get_data_queue(0);
while (!data_queue.empty()) {
auto* msg = dynamic_cast<const Message<T>*>(data_queue.front().get());
if (msg && msg->time < *common_control_time) {
data_time_tracker_.erase(msg->time);
data_queue.pop_front();
} else {
break;
}
}

// Look for matching data message
bool message_found = false;
if (!data_queue.empty()) {
auto* msg = dynamic_cast<const Message<T>*>(data_queue.front().get());
if (msg && msg->time == *common_control_time) {
// Get active control ports
std::vector<size_t> active_ports;
for (size_t i = 0; i < num_control_ports(); ++i) {
if (control_time_tracker_[i].at(*common_control_time)) {
active_ports.push_back(i);
}
}
void process_data() override {

// Route message to all active ports
for (size_t port : active_ports) {
get_output_queue(port).push_back(data_queue.front()->clone());
while(true) {

bool is_any_control_empty;
bool are_controls_sync;
do {
is_any_control_empty = false;
are_controls_sync = sync_control_inputs();
for (int i=0; i < num_control_ports(); i++) {
if (get_control_queue(i).empty()) {
is_any_control_empty = true;
break;
}
}
} while (!are_controls_sync && !is_any_control_empty );

data_time_tracker_.erase(msg->time);
data_queue.pop_front();
message_found = true;
}
}

clean_up_control_messages(*common_control_time);

if (!message_found) {
break;
}
}
}

private:
void clean_up_control_messages(timestamp_t time) {
for (auto& [port, tracker] : control_time_tracker_) {
tracker.erase(time);
}
if (!are_controls_sync) return;

for (size_t port = 0; port < num_control_ports(); ++port) {
auto& queue = get_control_queue(port);
while (!queue.empty()) {
auto* msg = dynamic_cast<const Message<BooleanData>*>(queue.front().get());
if (msg && msg->time <= time) {
queue.pop_front();
} else {
break;
auto& data_queue = get_data_queue(0);
if (data_queue.empty()) return;
auto* msg = dynamic_cast<const Message<T>*>(data_queue.front().get());
auto* ctrl_msg = dynamic_cast<const Message<BooleanData>*>(get_control_queue(0).front().get());
if (msg && ctrl_msg && msg->time == ctrl_msg->time) {
for (int i = 0; i < num_control_ports(); i++) {
ctrl_msg = dynamic_cast<const Message<BooleanData>*>(get_control_queue(i).front().get());
if (ctrl_msg->data.value) {
get_output_queue(i).push_back(data_queue.front()->clone());
}
get_control_queue(i).pop_front();
}
data_queue.pop_front();
} else if (msg && ctrl_msg && msg->time < ctrl_msg->time) {
data_queue.pop_front();
} else if (msg && ctrl_msg && ctrl_msg->time < msg->time) {
for (int i = 0; i < num_control_ports(); i++)
get_control_queue(i).pop_front();

}
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove space

}

std::set<timestamp_t> data_time_tracker_;
std::map<size_t, std::map<timestamp_t, bool>> control_time_tracker_;
};

// Factory functions for common configurations using PortType
Expand Down
4 changes: 2 additions & 2 deletions libs/core/include/rtbot/Input.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ class Input : public Operator {
// Do not throw exceptions in receive_data
void receive_data(std::unique_ptr<BaseMessage> msg, size_t port_index) override {
try {
Operator::receive_data(std::move(msg), port_index);
Operator::receive_data(std::move(msg), port_index);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not real change on this file but bad formatting

} catch (const std::exception& e) {
// Do nothing

}
}

Expand Down
Loading
Loading