Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions libs/api/include/rtbot/OperatorJson.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
#include "rtbot/std/InfiniteImpulseResponse.h"
#include "rtbot/std/Linear.h"
#include "rtbot/std/MovingAverage.h"
#include "rtbot/std/MovingSum.h"
#include "rtbot/std/PeakDetector.h"
#include "rtbot/std/Replace.h"
#include "rtbot/std/ResamplerConstant.h"
#include "rtbot/std/ResamplerHermite.h"
#include "rtbot/std/StandardDeviation.h"
Expand Down Expand Up @@ -73,6 +75,8 @@ class OperatorJson {
return make_output(id, parsed["portTypes"].get<std::vector<std::string>>());
} else if (type == "MovingAverage") {
return make_moving_average(id, parsed["window_size"].get<size_t>());
} else if (type == "MovingSum") {
return make_moving_sum(id, parsed["window_size"].get<size_t>());
} else if (type == "StandardDeviation") {
return make_std_dev(id, parsed["window_size"].get<size_t>());
} else if (type == "FiniteImpulseResponse") {
Expand Down Expand Up @@ -131,6 +135,8 @@ class OperatorJson {
return make_constant_boolean_to_number(id, parsed["value"].get<double>());
} else if (type == "LessThan") {
return make_less_than(id, parsed["value"].get<double>());
} else if (type == "LessThanOrEqualToReplace") {
return make_less_than_or_equal_to_replace(id, parsed["value"].get<double>(), parsed["replaceBy"].get<double>());
} else if (type == "EqualTo") {
return make_equal_to(id, parsed["value"].get<double>(), parsed.value("epsilon", 1e-10));
} else if (type == "NotEqualTo") {
Expand Down Expand Up @@ -264,6 +270,8 @@ class OperatorJson {
j["portTypes"] = std::dynamic_pointer_cast<Output>(op)->get_port_types();
} else if (type == "MovingAverage") {
j["window_size"] = std::dynamic_pointer_cast<MovingAverage>(op)->window_size();
} else if (type == "MovingSum") {
j["window_size"] = std::dynamic_pointer_cast<MovingSum>(op)->window_size();
} else if (type == "StandardDeviation") {
j["window_size"] = std::dynamic_pointer_cast<StandardDeviation>(op)->window_size();
} else if (type == "FiniteImpulseResponse") {
Expand Down Expand Up @@ -305,8 +313,9 @@ class OperatorJson {
j["epsilon"] = std::dynamic_pointer_cast<SyncNotEqual>(op)->get_epsilon();
} else if (type == "GreaterThan") {
j["value"] = std::dynamic_pointer_cast<GreaterThan>(op)->get_threshold();
} else if (type == "LessThan") {
j["value"] = std::dynamic_pointer_cast<LessThan>(op)->get_threshold();
} else if (type == "LessThanOrEqualToReplace") {
j["value"] = std::dynamic_pointer_cast<LessThanOrEqualToReplace>(op)->get_threshold();
j["replaceBy"] = std::dynamic_pointer_cast<LessThanOrEqualToReplace>(op)->get_replace_by();
} else if (type == "LogicalAnd" || type == "LogicalOr" || type == "LogicalXor" || type == "LogicalNand" ||
type == "LogicalNor" || type == "LogicalImplication") {
j["numPorts"] = std::dynamic_pointer_cast<BooleanSync>(op)->get_num_ports();
Expand Down Expand Up @@ -369,6 +378,7 @@ class OperatorJson {
} else {
throw std::runtime_error("Unknown operator type: " + type);
}
return j.dump();
}
};

Expand Down
277 changes: 277 additions & 0 deletions libs/api/test/test_program.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,283 @@ SCENARIO("Program handles Pipeline operators and resets", "[program][pipeline]")
}
}

SCENARIO("Program handles complex Pipeline operators and resets", "[program][pipeline]") {
GIVEN("A complex program with a Pipeline") {
std::string program_json = R"({
"apiVersion": "v1",
"operators": [
{
"id": "input",
"type": "Input",
"portTypes": [
"number",
"number"
]
},
{
"id": "hi_input_cutoff",
"type": "LessThanOrEqualToReplace",
"value": 0.5,
"replaceBy": 0.0
},
{
"id": "lo_input_cutoff",
"type": "LessThanOrEqualToReplace",
"value": 0.5,
"replaceBy": 0.0
},
{
"id": "hiresampler",
"type": "ResamplerConstant",
"interval": 5000
},
{
"id": "loresampler",
"type": "ResamplerConstant",
"interval": 5000
},
{
"id": "hiresampler_cutoff",
"type": "LessThanOrEqualToReplace",
"value": 0.5,
"replaceBy": 0.0
},
{
"id": "loresampler_cutoff",
"type": "LessThanOrEqualToReplace",
"value": 0.5,
"replaceBy": 0.0
},
{
"id": "power1",
"type": "Scale",
"value": 220
},
{
"id": "power2",
"type": "Scale",
"value": 220
},
{
"id": "power1_cutoff",
"type": "LessThanOrEqualToReplace",
"value": 0.5,
"replaceBy": 0.0
},
{
"id": "power2_cutoff",
"type": "LessThanOrEqualToReplace",
"value": 0.5,
"replaceBy": 0.0
},
{
"id": "total_power",
"type": "Addition"
},
{
"id": "total_power_cutoff",
"type": "LessThanOrEqualToReplace",
"value": 0.5,
"replaceBy": 0.0
},
{
"id": "hourly",
"type": "Pipeline",
"input_port_types": [
"number"
],
"output_port_types": [
"number"
],
"operators": [
{
"id": "trapezoid",
"type": "MovingAverage",
"window_size": 2
},
{
"id": "trapezoid_cutoff",
"type": "LessThanOrEqualToReplace",
"value": 0.5,
"replaceBy": 0.0
},
{
"id": "wh",
"type": "Scale",
"value": 0.00138888888
},
{
"id": "wh_cutoff",
"type": "LessThanOrEqualToReplace",
"value": 0.5,
"replaceBy": 0.0
},
{
"id": "hourly_ms",
"type": "MovingSum",
"window_size": 720
}
],
"connections": [
{
"from": "trapezoid",
"to": "trapezoid_cutoff"
},
{
"from": "trapezoid_cutoff",
"to": "wh"
},
{
"from": "wh",
"to": "wh_cutoff"
},
{
"from": "wh_cutoff",
"to": "hourly_ms"
}
],
"entryOperator": "trapezoid",
"outputMappings": {
"hourly_ms": {
"o1": "o1"
}
}
},
{
"id": "output",
"type": "Output",
"portTypes": [
"number"
]
}
],
"connections": [
{
"from": "input",
"to": "hi_input_cutoff",
"fromPort": "o1",
"toPort": "i1"
},
{
"from": "input",
"to": "lo_input_cutoff",
"fromPort": "o2",
"toPort": "i1"
},
{
"from": "hi_input_cutoff",
"to": "hiresampler"
},
{
"from": "lo_input_cutoff",
"to": "loresampler"
},
{
"from": "hiresampler",
"to": "hiresampler_cutoff"
},
{
"from": "hiresampler_cutoff",
"to": "power1"
},
{
"from": "loresampler",
"to": "loresampler_cutoff"
},
{
"from": "loresampler_cutoff",
"to": "power2"
},
{
"from": "power1",
"to": "power1_cutoff"
},
{
"from": "power2",
"to": "power2_cutoff"
},
{
"from": "power1_cutoff",
"to": "total_power",
"fromPort": "o1",
"toPort": "i1"
},
{
"from": "power2_cutoff",
"to": "total_power",
"fromPort": "o1",
"toPort": "i2"
},
{
"from": "total_power",
"to": "total_power_cutoff"
},
{
"from": "total_power_cutoff",
"to": "hourly"
},
{
"from": "hourly",
"to": "output",
"fromPort": "o1",
"toPort": "i1"
}
],
"entryOperator": "input",
"output": {
"output": [
"o1"
]
},
"title": "Power Consumption Monitor with Multiple Averages",
"description": "Calculates hourly power consumption from two current inputs"
})";

Program program(program_json);

WHEN("Processing messages") {
int t = 5000;
for (int h = 1; h <= 20; h++) {
ProgramMsgBatch final_batch;
// int iterations = 0;
if (h % 2 == 1) {
while (final_batch.size() == 0) {
program.receive({t, NumberData{30.23}}, "i1");
final_batch = program.receive({t, NumberData{10.5802}}, "i2");
t = t + 5000;
// iterations++;
}
} else {
while (final_batch.size() == 0) {
program.receive({t, NumberData{0.01}}, "i1");
final_batch = program.receive({t, NumberData{0.01}}, "i2");
t = t + 5000;
// iterations++;
}
}

if (final_batch.size() == 1 && h % 2 == 1) {
const auto* out_msg = dynamic_cast<const Message<NumberData>*>(final_batch["output"]["o1"].back().get());
REQUIRE(out_msg->data.value > 0.0);
/*std::cout << " time " << out_msg->time << std::endl;
std::cout << " value " << out_msg->data.value << std::endl;
std::cout << " iterations " << iterations << std::endl;
std::cout << " ----------------------------- " << std::endl;*/
} else if (final_batch.size() == 1 && h % 2 == 0) {
const auto* out_msg = dynamic_cast<const Message<NumberData>*>(final_batch["output"]["o1"].back().get());
REQUIRE(out_msg->data.value == 0.0);
/*std::cout << " time " << out_msg->time << std::endl;
std::cout << " value " << out_msg->data.value << std::endl;
std::cout << " iterations " << iterations << std::endl;
std::cout << " ----------------------------- " << std::endl;*/
} else {
FAIL(true);
}
}
}
}
}

SCENARIO("Program handles Pipeline serialization", "[program][pipeline]") {
GIVEN("A program with a stateful Pipeline") {
std::string program_json = R"({
Expand Down
9 changes: 4 additions & 5 deletions libs/core/include/rtbot/Pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class Pipeline : public Operator {
entry_operator_->execute();
input_queue.pop_front();
// Process output mappings
bool was_reseted = false;
bool was_reset = false;
for (const auto& [op_id, mappings] : output_mappings_) {
auto it = operators_.find(op_id);
if (it != operators_.end()) {
Expand All @@ -139,23 +139,22 @@ class Pipeline : public Operator {
const auto& source_queue = op->get_output_queue(operator_port);
// Only forward if source operator has produced output on the mapped port
if (!source_queue.empty()) {
was_reseted = false;
auto& target_queue = get_output_queue(pipeline_port);
for (const auto& msg : source_queue) {
RTBOT_LOG_DEBUG("Forwarding message ", msg->to_string(), " from ", op_id, " -> ", pipeline_port);
target_queue.push_back(msg->clone());
reset();
was_reseted = true;
was_reset = true;
break;
}
}
}
if (was_reseted) {
if (was_reset) {
break;
}
}
}
if (was_reseted) {
if (was_reset) {
break;
}
}
Expand Down
20 changes: 0 additions & 20 deletions libs/std/include/rtbot/std/Collector.h

This file was deleted.

Loading
Loading