Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,11 @@ def __init__(self, protocol: Protocol) -> None:

self._registrations: dict[str, Registration] = {}

if protocol.parameters and "unregister_timeout" in protocol.parameters:
manager.unregister_timeout = protocol.parameters.get("unregister_timeout")
if protocol.parameters:
if "unregister_timeout" in protocol.parameters:
manager.unregister_timeout = protocol.parameters["unregister_timeout"]
if "topics_glob" in protocol.parameters:
self.topics_glob = protocol.parameters["topics_glob"]

def advertise(self, message: dict[str, Any]) -> None:
# Pull out the ID
Expand All @@ -124,10 +127,10 @@ def advertise(self, message: dict[str, Any]) -> None:
latch: bool = message.get("latch", False)
queue_size: int = message.get("queue_size", 100)

if Advertise.topics_glob is not None and Advertise.topics_glob:
if self.topics_glob:
self.protocol.log("debug", "Topic security glob enabled, checking topic: " + topic)
match = False
for glob in Advertise.topics_glob:
for glob in self.topics_glob:
if fnmatch.fnmatch(topic, glob):
self.protocol.log(
"debug",
Expand Down Expand Up @@ -159,10 +162,10 @@ def unadvertise(self, message: dict[str, Any]) -> None:
self.basic_type_check(message, self.unadvertise_msg_fields)
topic: str = message["topic"]

if Advertise.topics_glob is not None and Advertise.topics_glob:
if self.topics_glob:
self.protocol.log("debug", "Topic security glob enabled, checking topic: " + topic)
match = False
for glob in Advertise.topics_glob:
for glob in self.topics_glob:
if fnmatch.fnmatch(topic, glob):
self.protocol.log(
"debug",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,17 @@ def graceful_shutdown(self) -> None:


class AdvertiseAction(Capability):
actions_glob = None

advertise_action_msg_fields = ((True, "action", str), (True, "type", str))

actions_glob: list[str] | None = None

def __init__(self, protocol: Protocol) -> None:
# Call superclass constructor
Capability.__init__(self, protocol)

if protocol.parameters and "actions_glob" in protocol.parameters:
self.actions_glob = protocol.parameters["actions_glob"]

# Register the operations that this capability provides
protocol.register_operation("advertise_action", self.advertise_action)

Expand All @@ -236,13 +239,13 @@ def advertise_action(self, message: dict) -> None:
# parse the incoming message
action_name: str = message["action"]

if AdvertiseAction.actions_glob is not None and AdvertiseAction.actions_glob:
if self.actions_glob:
self.protocol.log(
"debug",
"Action security glob enabled, checking action: " + action_name,
)
match = False
for glob in AdvertiseAction.actions_glob:
for glob in self.actions_glob:
if fnmatch.fnmatch(action_name, glob):
self.protocol.log(
"debug",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,17 @@ def graceful_shutdown(self) -> None:


class AdvertiseService(Capability):
services_glob: list[str] | None = None

advertise_service_msg_fields = ((True, "service", str), (True, "type", str))

services_glob: list[str] | None = None

def __init__(self, protocol: Protocol) -> None:
# Call superclass constructor
Capability.__init__(self, protocol)

if protocol.parameters and "services_glob" in protocol.parameters:
self.services_glob = protocol.parameters["services_glob"]

# Register the operations that this capability provides
protocol.register_operation("advertise_service", self.advertise_service)

Expand All @@ -122,13 +125,13 @@ def advertise_service(self, message: dict[str, Any]) -> None:
# parse the incoming message
service_name: str = message["service"]

if AdvertiseService.services_glob is not None and AdvertiseService.services_glob:
if self.services_glob:
self.protocol.log(
"debug",
"Service security glob enabled, checking service: " + service_name,
)
match = False
for glob in AdvertiseService.services_glob:
for glob in self.services_glob:
if fnmatch.fnmatch(service_name, glob):
self.protocol.log(
"debug",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,25 +51,27 @@ class CallService(Capability):
(False, "compression", str),
)

# parameters
services_glob: list[str] | None = None
default_timeout: float = 5.0
call_services_in_new_thread: bool = True

def __init__(self, protocol: Protocol) -> None:
# Call superclass constructor
Capability.__init__(self, protocol)

self.default_timeout = (
protocol.node_handle.get_parameter("default_call_service_timeout")
.get_parameter_value()
.double_value
)
if self.protocol.parameters:
if "services_glob" in self.protocol.parameters:
self.services_glob = self.protocol.parameters["services_glob"]
if "default_call_service_timeout" in self.protocol.parameters:
self.default_timeout = self.protocol.parameters["default_call_service_timeout"]
if "call_services_in_new_thread" in self.protocol.parameters:
self.call_services_in_new_thread = self.protocol.parameters[
"call_services_in_new_thread"
]

# Register the operations that this capability provides
call_services_in_new_thread = (
protocol.node_handle.get_parameter("call_services_in_new_thread")
.get_parameter_value()
.bool_value
)
if call_services_in_new_thread:
if self.call_services_in_new_thread:
# Calls the service in a separate thread so multiple services can be processed simultaneously.
protocol.node_handle.get_logger().info("Calling services in new thread")
protocol.register_operation(
Expand All @@ -94,12 +96,12 @@ def call_service(self, message: dict[str, Any]) -> None:
args: list | dict[str, Any] = message.get("args", [])
timeout: float = message.get("timeout", self.default_timeout)

if CallService.services_glob is not None and CallService.services_glob:
if self.services_glob:
self.protocol.log(
"debug", "Service security glob enabled, checking service: " + service
)
match = False
for glob in CallService.services_glob:
for glob in self.services_glob:
if fnmatch.fnmatch(service, glob):
self.protocol.log(
"debug",
Expand Down
11 changes: 7 additions & 4 deletions rosbridge_library/src/rosbridge_library/capabilities/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,11 @@ def __init__(self, protocol: Protocol) -> None:
# Save the topics that are published on for the purposes of unregistering
self._published: dict[str, bool] = {}

if protocol.parameters and "unregister_timeout" in protocol.parameters:
manager.unregister_timeout = protocol.parameters.get("unregister_timeout")
if protocol.parameters:
if "unregister_timeout" in protocol.parameters:
manager.unregister_timeout = protocol.parameters["unregister_timeout"]
if "topics_glob" in protocol.parameters:
self.topics_glob = protocol.parameters["topics_glob"]

def publish(self, message: dict[str, Any]) -> None:
# Do basic type checking
Expand All @@ -68,10 +71,10 @@ def publish(self, message: dict[str, Any]) -> None:
latch: bool = message.get("latch", False)
queue_size: int = message.get("queue_size", 100)

if Publish.topics_glob is not None and Publish.topics_glob:
if self.topics_glob:
self.protocol.log("debug", "Topic security glob enabled, checking topic: " + topic)
match = False
for glob in Publish.topics_glob:
for glob in self.topics_glob:
if fnmatch.fnmatch(topic, glob):
self.protocol.log(
"debug",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ class SendActionGoal(Capability):
)
cancel_action_goal_msg_fields = ((True, "action", str),)

actions_glob = None
actions_glob: list[str] | None = None
send_action_goals_in_new_thread: bool = False

client_handler_list: dict[str, ActionClientHandler]

def __init__(self, protocol: Protocol) -> None:
Expand All @@ -68,13 +70,16 @@ def __init__(self, protocol: Protocol) -> None:

self.client_handler_list = {}

if protocol.parameters:
if "actions_glob" in protocol.parameters:
self.actions_glob = protocol.parameters["actions_glob"]
if "send_action_goals_in_new_thread" in protocol.parameters:
self.send_action_goals_in_new_thread = protocol.parameters[
"send_action_goals_in_new_thread"
]

# Register the operations that this capability provides
send_action_goals_in_new_thread = (
protocol.node_handle.get_parameter("send_action_goals_in_new_thread")
.get_parameter_value()
.bool_value
)
if send_action_goals_in_new_thread:
if self.send_action_goals_in_new_thread:
# Sends the action goal in a separate thread so multiple actions can be processed simultaneously.
protocol.node_handle.get_logger().info("Sending action goals in new thread")
protocol.register_operation(
Expand Down Expand Up @@ -106,10 +111,10 @@ def send_action_goal(self, message: dict) -> None:
compression: str = message.get("compression", "none")
args: list | dict[str, Any] = message.get("args", [])

if SendActionGoal.actions_glob is not None and SendActionGoal.actions_glob:
if self.actions_glob:
self.protocol.log("debug", f"Action security glob enabled, checking action: {action}")
match = False
for glob in SendActionGoal.actions_glob:
for glob in self.actions_glob:
if fnmatch.fnmatch(action, glob):
self.protocol.log(
"debug",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ def __init__(self, protocol: Protocol) -> None:
# Call superclass constructor
Capability.__init__(self, protocol)

if protocol.parameters and "topics_glob" in protocol.parameters:
self.topics_glob = protocol.parameters["topics_glob"]

# Register the operations that this capability provides
protocol.register_operation("subscribe", self.subscribe)
protocol.register_operation("unsubscribe", self.unsubscribe)
Expand All @@ -270,10 +273,10 @@ def subscribe(self, msg: dict[str, Any]) -> None:
# Make the subscription
topic: str = msg["topic"]

if Subscribe.topics_glob is not None and Subscribe.topics_glob:
if self.topics_glob is not None and self.topics_glob:
self.protocol.log("debug", "Topic security glob enabled, checking topic: " + topic)
match = False
for glob in Subscribe.topics_glob:
for glob in self.topics_glob:
if fnmatch.fnmatch(topic, glob):
self.protocol.log(
"debug",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,23 @@ def __init__(self, protocol: Protocol) -> None:
# Call superclass constructor
Capability.__init__(self, protocol)

if protocol.parameters and "actions_glob" in protocol.parameters:
self.actions_glob = protocol.parameters["actions_glob"]

# Register the operations that this capability provides
protocol.register_operation("unadvertise_action", self.unadvertise_action)

def unadvertise_action(self, message: dict[str, Any]) -> None:
# parse the message
action_name: str = message["action"]

if UnadvertiseAction.actions_glob is not None and UnadvertiseAction.actions_glob:
if self.actions_glob:
self.protocol.log(
"debug",
f"Action security glob enabled, checking action: {action_name}",
)
match = False
for glob in UnadvertiseAction.actions_glob:
for glob in self.actions_glob:
if fnmatch.fnmatch(action_name, glob):
self.protocol.log(
"debug",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,23 @@ def __init__(self, protocol: Protocol) -> None:
# Call superclass constructor
Capability.__init__(self, protocol)

if protocol.parameters and "services_glob" in protocol.parameters:
self.services_glob = protocol.parameters["services_glob"]

# Register the operations that this capability provides
protocol.register_operation("unadvertise_service", self.unadvertise_service)

def unadvertise_service(self, message: dict[str, Any]) -> None:
# parse the message
service_name: str = message["service"]

if UnadvertiseService.services_glob is not None and UnadvertiseService.services_glob:
if self.services_glob:
self.protocol.log(
"debug",
"Service security glob enabled, checking service: " + service_name,
)
match = False
for glob in UnadvertiseService.services_glob:
for glob in self.services_glob:
if fnmatch.fnmatch(service_name, glob):
self.protocol.log(
"debug",
Expand Down
36 changes: 23 additions & 13 deletions rosbridge_library/src/rosbridge_library/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,24 @@ class Protocol:
buffer = ""
old_buffer = ""
busy = False
# if this is too low, ("simple")clients network stacks will get flooded (when sending fragments of a huge message..)
# .. depends on message_size/bandwidth/performance/client_limits/...
# !! this might be related to (or even be avoided by using) throttle_rate !!
delay_between_messages = 0
# global list of non-ros advertised services
external_service_list: dict[str, AdvertisedServiceHandler]
# global list of non-ros advertised actions
external_action_list: dict[str, AdvertisedActionHandler]

max_message_size: int = 1000000
# if this is too low, ("simple")clients network stacks will get flooded (when sending fragments of a huge message..)
# .. depends on message_size/bandwidth/performance/client_limits/...
# !! this might be related to (or even be avoided by using) throttle_rate !!
delay_between_messages: float = 0.0
# Use only BSON for the whole communication if the server has been started with bson_only_mode:=True
bson_only_mode = False
bson_only_mode: bool = False

parameters = None
parameters: dict[str, Any] | None = None

def __init__(self, client_id: str, node_handle: Node) -> None:
def __init__(
self, client_id: str, node_handle: Node, parameters: dict[str, Any] | None = None
) -> None:
"""
Initialize the protocol with a client ID and a ROS2 node handle.

Expand All @@ -109,16 +113,23 @@ def __init__(self, client_id: str, node_handle: Node) -> None:
:param node_handle: A ROS2 node handle
"""
self.client_id = client_id
self.node_handle = node_handle
self.parameters = parameters

self.capabilities: list[Capability] = []
self.operations: dict[str, Callable[[dict[str, Any]], None]] = {}
self.node_handle = node_handle
self.external_service_list = {}
self.external_action_list = {}

if self.parameters:
self.fragment_size = self.parameters["max_message_size"]
self.delay_between_messages = self.parameters["delay_between_messages"]
self.bson_only_mode = self.parameters.get("bson_only_mode", False)
if "max_message_size" in self.parameters:
self.max_message_size = self.parameters["max_message_size"]
if "delay_between_messages" in self.parameters:
self.delay_between_messages = self.parameters["delay_between_messages"]
if "bson_only_mode" in self.parameters:
self.bson_only_mode = self.parameters["bson_only_mode"]

self.fragment_size = self.max_message_size

# added default message_string="" to allow recalling incoming until buffer is empty without giving a parameter
# --> allows to get rid of (..or minimize) delay between client-side sends
Expand Down Expand Up @@ -222,8 +233,7 @@ def incoming(self, message_string: str = "") -> None:
# This way, a client can change/overwrite its active values anytime by just including parameter field in any
# message sent to rosbridge. Maybe need to be improved to bind parameter values to specific operation.
if "fragment_size" in msg:
self.fragment_size = msg["fragment_size"]
# print "fragment size set to:", self.fragment_size
self.fragment_size = min(msg["fragment_size"], self.max_message_size)
if "message_intervall" in msg and is_number(msg["message_intervall"]):
self.delay_between_messages = msg["message_intervall"]
if "png" in msg:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ class RosbridgeProtocol(Protocol):
def __init__(
self, client_id: str, node_handle: Node, parameters: dict[str, Any] | None = None
) -> None:
self.parameters = parameters
Protocol.__init__(self, client_id, node_handle)
Protocol.__init__(self, client_id, node_handle, parameters)
for capability_class in self.rosbridge_capabilities:
self.add_capability(capability_class)
Loading
Loading