Skip to content

Commit 4a3f4d3

Browse files
committed
Add controlling topic expected frequencies and tolerances through ROS 2 parameters
Signed-off-by: Blake McHale <bmchale@nvidia.com>
1 parent 0f35afb commit 4a3f4d3

File tree

13 files changed

+1462
-28
lines changed

13 files changed

+1462
-28
lines changed

greenwave_monitor/CMakeLists.txt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,17 @@ if(BUILD_TESTING)
110110
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}
111111
)
112112

113+
# Add parameter-based topic configuration tests (in test/parameters/)
114+
# Automatically discover and add all test_*.py files in the parameters directory
115+
file(GLOB PARAM_TEST_FILES "${CMAKE_SOURCE_DIR}/test/parameters/test_*.py")
116+
foreach(TEST_FILE ${PARAM_TEST_FILES})
117+
get_filename_component(TEST_NAME ${TEST_FILE} NAME_WE)
118+
ament_add_pytest_test(${TEST_NAME} test/parameters/${TEST_NAME}.py
119+
TIMEOUT 120
120+
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}
121+
)
122+
endforeach()
123+
113124
# Add gtests
114125
ament_add_gtest(test_message_diagnostics test/test_message_diagnostics.cpp
115126
TIMEOUT 60

greenwave_monitor/examples/example.launch.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
# limitations under the License.
1414

1515
from launch import LaunchDescription
16-
from launch.actions import LogInfo
1716
from launch_ros.actions import Node
1817

1918

@@ -52,11 +51,13 @@ def generate_launch_description():
5251
name='greenwave_monitor',
5352
output='log',
5453
parameters=[
55-
{'topics': ['/imu_topic', '/image_topic', '/string_topic']}
56-
],
57-
),
58-
LogInfo(
59-
msg='Run `ros2 run r2s_gw r2s_gw` in another terminal to see the demo output '
60-
'with the r2s dashboard.'
61-
),
54+
{
55+
'topics': {
56+
'/imu_topic': {'expected_frequency': 100.0},
57+
'/image_topic': {'expected_frequency': 30.0},
58+
'/string_topic': {'expected_frequency': 1000.0}
59+
},
60+
}
61+
]
62+
)
6263
])

greenwave_monitor/greenwave_monitor/test_utils.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@
2222
from typing import List, Optional, Tuple
2323

2424
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus
25+
from greenwave_monitor.ui_adaptor import (
26+
FREQ_SUFFIX,
27+
TOL_SUFFIX,
28+
TOPIC_PARAM_PREFIX,
29+
)
2530
from greenwave_monitor_interfaces.srv import ManageTopic, SetExpectedFrequency
2631
import launch_ros
2732
import rclpy
@@ -65,19 +70,32 @@ def create_minimal_publisher(
6570

6671
def create_monitor_node(namespace: str = MONITOR_NODE_NAMESPACE,
6772
node_name: str = MONITOR_NODE_NAME,
68-
topics: List[str] = None):
73+
topics: List[str] = None,
74+
topic_configs: dict = None):
6975
"""Create a greenwave_monitor node for testing."""
7076
if topics is None:
7177
topics = ['/test_topic']
7278

79+
# Ensure topics list has at least one element (even if empty string)
80+
if not topics:
81+
topics = ['']
82+
83+
params = {'topics': topics}
84+
85+
if topic_configs:
86+
for topic, config in topic_configs.items():
87+
if 'expected_frequency' in config:
88+
params[f'{TOPIC_PARAM_PREFIX}{topic}{FREQ_SUFFIX}'] = float(
89+
config['expected_frequency'])
90+
if 'tolerance' in config:
91+
params[f'{TOPIC_PARAM_PREFIX}{topic}{TOL_SUFFIX}'] = float(config['tolerance'])
92+
7393
return launch_ros.actions.Node(
7494
package='greenwave_monitor',
7595
executable='greenwave_monitor',
7696
name=node_name,
7797
namespace=namespace,
78-
parameters=[{
79-
'topics': topics
80-
}],
98+
parameters=[params],
8199
output='screen'
82100
)
83101

greenwave_monitor/greenwave_monitor/ui_adaptor.py

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,58 @@
3636
"""
3737

3838
from dataclasses import dataclass
39+
from enum import Enum
3940
import threading
4041
import time
4142
from typing import Dict
4243

4344
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus
4445
from greenwave_monitor_interfaces.srv import ManageTopic, SetExpectedFrequency
46+
from rcl_interfaces.msg import ParameterEvent, ParameterType, ParameterValue
47+
from rcl_interfaces.srv import GetParameters, ListParameters
4548
import rclpy
4649
from rclpy.node import Node
4750

51+
# Parameter name constants
52+
TOPIC_PARAM_PREFIX = 'topics.'
53+
FREQ_SUFFIX = '.expected_frequency'
54+
TOL_SUFFIX = '.tolerance'
55+
DEFAULT_TOLERANCE_PERCENT = 5.0
56+
57+
58+
class TopicParamField(Enum):
59+
"""Type of topic parameter field."""
60+
61+
NONE = 0
62+
FREQUENCY = 1
63+
TOLERANCE = 2
64+
65+
66+
def parse_topic_param_name(param_name: str) -> tuple[str, TopicParamField]:
67+
"""Parse parameter name to extract topic name and field type."""
68+
if not param_name.startswith(TOPIC_PARAM_PREFIX):
69+
return ('', TopicParamField.NONE)
70+
71+
topic_and_field = param_name[len(TOPIC_PARAM_PREFIX):]
72+
73+
if topic_and_field.endswith(FREQ_SUFFIX):
74+
topic_name = topic_and_field[:-len(FREQ_SUFFIX)]
75+
return (topic_name, TopicParamField.FREQUENCY)
76+
elif topic_and_field.endswith(TOL_SUFFIX):
77+
topic_name = topic_and_field[:-len(TOL_SUFFIX)]
78+
return (topic_name, TopicParamField.TOLERANCE)
79+
80+
return ('', TopicParamField.NONE)
81+
82+
83+
def param_value_to_float(value: ParameterValue) -> float | None:
84+
"""Convert a ParameterValue to float if it's a numeric type."""
85+
if value.type == ParameterType.PARAMETER_DOUBLE:
86+
return value.double_value
87+
elif value.type == ParameterType.PARAMETER_INTEGER:
88+
return float(value.integer_value)
89+
return None
90+
4891

4992
@dataclass
5093
class UiDiagnosticData:
@@ -119,6 +162,13 @@ def _setup_ros_components(self):
119162
100
120163
)
121164

165+
self.param_events_subscription = self.node.create_subscription(
166+
ParameterEvent,
167+
'/parameter_events',
168+
self._on_parameter_event,
169+
10
170+
)
171+
122172
manage_service_name = f'{self.monitor_node_name}/manage_topic'
123173
set_freq_service_name = f'{self.monitor_node_name}/set_expected_frequency'
124174

@@ -134,6 +184,108 @@ def _setup_ros_components(self):
134184
set_freq_service_name
135185
)
136186

187+
# Parameter service clients for querying initial state
188+
self.list_params_client = self.node.create_client(
189+
ListParameters,
190+
f'{self.monitor_node_name}/list_parameters'
191+
)
192+
self.get_params_client = self.node.create_client(
193+
GetParameters,
194+
f'{self.monitor_node_name}/get_parameters'
195+
)
196+
197+
# Query initial parameters after a short delay to let services come up
198+
self._initial_params_timer = self.node.create_timer(
199+
0.1, self._fetch_initial_parameters_callback)
200+
201+
def _fetch_initial_parameters_callback(self):
202+
"""Timer callback to fetch initial parameters (retries until services ready)."""
203+
# Check if services are available (non-blocking)
204+
if not self.list_params_client.service_is_ready():
205+
return # Timer will retry
206+
207+
if not self.get_params_client.service_is_ready():
208+
return # Timer will retry
209+
210+
# Cancel timer now that services are ready
211+
if self._initial_params_timer is not None:
212+
self._initial_params_timer.cancel()
213+
self._initial_params_timer = None
214+
215+
# List all parameters (prefixes filter is unreliable with nested names)
216+
list_request = ListParameters.Request()
217+
list_request.prefixes = ['topics']
218+
list_request.depth = 10
219+
220+
list_future = self.list_params_client.call_async(list_request)
221+
list_future.add_done_callback(self._on_list_parameters_response)
222+
223+
def _on_list_parameters_response(self, future):
224+
"""Handle response from list_parameters service."""
225+
try:
226+
if future.result() is None:
227+
return
228+
229+
all_param_names = future.result().result.names
230+
231+
# Filter to only topic parameters (prefixes filter is unreliable)
232+
param_names = [n for n in all_param_names if n.startswith(TOPIC_PARAM_PREFIX)]
233+
234+
if not param_names:
235+
return
236+
237+
# Store for use in get callback
238+
self._pending_param_names = param_names
239+
240+
# Get values for topic parameters only
241+
get_request = GetParameters.Request()
242+
get_request.names = param_names
243+
244+
get_future = self.get_params_client.call_async(get_request)
245+
get_future.add_done_callback(self._on_get_parameters_response)
246+
except Exception as e:
247+
self.node.get_logger().debug(f'Error listing parameters: {e}')
248+
249+
def _on_get_parameters_response(self, future):
250+
"""Handle response from get_parameters service."""
251+
try:
252+
if future.result() is None:
253+
return
254+
255+
param_names = getattr(self, '_pending_param_names', [])
256+
values = future.result().values
257+
258+
# Parse parameters into expected_frequencies
259+
topic_configs: Dict[str, Dict[str, float]] = {}
260+
261+
for name, value in zip(param_names, values):
262+
numeric_value = param_value_to_float(value)
263+
if numeric_value is None:
264+
continue
265+
266+
topic_name, field = parse_topic_param_name(name)
267+
if not topic_name or field == TopicParamField.NONE:
268+
continue
269+
270+
if topic_name not in topic_configs:
271+
topic_configs[topic_name] = {}
272+
273+
if field == TopicParamField.FREQUENCY:
274+
topic_configs[topic_name]['freq'] = numeric_value
275+
elif field == TopicParamField.TOLERANCE:
276+
topic_configs[topic_name]['tol'] = numeric_value
277+
278+
# Update expected_frequencies with valid configs
279+
with self.data_lock:
280+
for topic_name, config in topic_configs.items():
281+
freq = config.get('freq', 0.0)
282+
tol = config.get('tol', DEFAULT_TOLERANCE_PERCENT)
283+
if freq > 0:
284+
self.expected_frequencies[topic_name] = (freq, tol)
285+
286+
except Exception as e:
287+
self.node.get_logger().debug(f'Error fetching parameters: {e}')
288+
137289
def _extract_topic_name(self, diagnostic_name: str) -> str:
138290
"""
139291
Extract topic name from diagnostic status name.
@@ -168,6 +320,35 @@ def _on_diagnostics(self, msg: DiagnosticArray):
168320
topic_name = self._extract_topic_name(status.name)
169321
self.ui_diagnostics[topic_name] = ui_data
170322

323+
def _on_parameter_event(self, msg: ParameterEvent):
324+
"""Process parameter change events from the monitor node."""
325+
# Only process events from the monitor node
326+
if self.monitor_node_name not in msg.node:
327+
return
328+
329+
# Process changed and new parameters
330+
for param in msg.changed_parameters + msg.new_parameters:
331+
value = param_value_to_float(param.value)
332+
if value is None:
333+
continue
334+
335+
topic_name, field = parse_topic_param_name(param.name)
336+
if not topic_name or field == TopicParamField.NONE:
337+
continue
338+
339+
with self.data_lock:
340+
current = self.expected_frequencies.get(topic_name, (0.0, 0.0))
341+
342+
if field == TopicParamField.FREQUENCY:
343+
if value > 0:
344+
self.expected_frequencies[topic_name] = (value, current[1])
345+
elif topic_name in self.expected_frequencies:
346+
del self.expected_frequencies[topic_name]
347+
348+
elif field == TopicParamField.TOLERANCE:
349+
if current[0] > 0: # Only update if frequency is set
350+
self.expected_frequencies[topic_name] = (current[0], value)
351+
171352
def toggle_topic_monitoring(self, topic_name: str):
172353
"""Toggle monitoring for a topic."""
173354
if not self.manage_topic_client.wait_for_service(timeout_sec=1.0):

greenwave_monitor/include/greenwave_monitor.hpp

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <vector>
2727

2828
#include "rclcpp/rclcpp.hpp"
29+
#include "rcl_interfaces/msg/set_parameters_result.hpp"
2930
#include "std_msgs/msg/string.hpp"
3031
#include "diagnostic_msgs/msg/diagnostic_array.hpp"
3132
#include "diagnostic_msgs/msg/diagnostic_status.hpp"
@@ -40,6 +41,12 @@ class GreenwaveMonitor : public rclcpp::Node
4041
explicit GreenwaveMonitor(const rclcpp::NodeOptions & options);
4142

4243
private:
44+
struct TopicConfig
45+
{
46+
std::optional<double> expected_frequency;
47+
std::optional<double> tolerance;
48+
};
49+
4350
std::optional<std::string> find_topic_type_with_retry(
4451
const std::string & topic, const int max_retries, const int retry_period_s);
4552

@@ -57,6 +64,29 @@ class GreenwaveMonitor : public rclcpp::Node
5764
const std::shared_ptr<greenwave_monitor_interfaces::srv::SetExpectedFrequency::Request> request,
5865
std::shared_ptr<greenwave_monitor_interfaces::srv::SetExpectedFrequency::Response> response);
5966

67+
bool set_topic_expected_frequency(
68+
const std::string & topic_name,
69+
double expected_hz,
70+
double tolerance_percent,
71+
bool add_topic_if_missing,
72+
std::string & message,
73+
bool update_parameters = true);
74+
75+
rcl_interfaces::msg::SetParametersResult on_parameter_change(
76+
const std::vector<rclcpp::Parameter> & parameters);
77+
78+
void apply_topic_config_if_complete(const std::string & topic_name);
79+
80+
void load_topic_parameters_from_overrides();
81+
82+
std::optional<double> get_numeric_parameter(const std::string & param_name);
83+
84+
void try_undeclare_parameter(const std::string & param_name);
85+
86+
void declare_or_set_parameter(const std::string & param_name, double value);
87+
88+
void undeclare_topic_parameters(const std::string & topic_name);
89+
6090
bool add_topic(const std::string & topic, std::string & message);
6191

6292
bool remove_topic(const std::string & topic, std::string & message);
@@ -76,4 +106,7 @@ class GreenwaveMonitor : public rclcpp::Node
76106
manage_topic_service_;
77107
rclcpp::Service<greenwave_monitor_interfaces::srv::SetExpectedFrequency>::SharedPtr
78108
set_expected_frequency_service_;
109+
110+
std::map<std::string, TopicConfig> pending_topic_configs_;
111+
rclcpp::node_interfaces::OnSetParametersCallbackHandle::SharedPtr param_callback_handle_;
79112
};

0 commit comments

Comments
 (0)