@@ -153,8 +153,9 @@ void MqttClient::loadParameters() {
153153 }
154154 }
155155
156- NODELET_INFO (" Bridging ROS topic '%s' to MQTT topic '%s'" ,
157- ros_topic.c_str (), ros2mqtt.mqtt .topic .c_str ());
156+ NODELET_INFO (" Bridging ROS topic '%s' to MQTT topic '%s' %s" ,
157+ ros_topic.c_str (), ros2mqtt.mqtt .topic .c_str (),
158+ ros2mqtt.stamped ? " and measuring latency" : " " );
158159 } else {
159160 NODELET_WARN (
160161 " Parameter 'bridge/ros2mqtt[%d]' is missing subparameter "
@@ -403,52 +404,46 @@ void MqttClient::ros2mqtt(const topic_tools::ShapeShifter::ConstPtr& ros_msg,
403404 mqtt_topic.c_str (), e.what ());
404405 }
405406
406- // serialize ROS message to buffer
407- uint32_t msg_length = ros::serialization::serializationLength (*ros_msg);
408- std::vector<uint8_t > msg_buffer;
409- msg_buffer.resize (msg_length);
410- ros::serialization::OStream msg_stream (msg_buffer.data (), msg_length);
411- ros::serialization::serialize (msg_stream, *ros_msg);
412-
413407 // build MQTT payload for ROS message (R) as [1, S, R] or [0, R]
414408 // where first item = 1 if timestamp (S) is included
409+ uint32_t msg_length = ros::serialization::serializationLength (*ros_msg);
415410 uint32_t payload_length = 1 + msg_length;
411+ uint32_t stamp_length = ros::serialization::serializationLength (ros::Time ());
416412 uint32_t msg_offset = 1 ;
417413 std::vector<uint8_t > payload_buffer;
414+ if (ros2mqtt.stamped ) {
415+ // allocate buffer with appropriate size to hold [1, S, R]
416+ msg_offset += stamp_length;
417+ payload_length += stamp_length;
418+ payload_buffer.resize (payload_length);
419+ payload_buffer[0 ] = 1 ;
420+ } else {
421+ // allocate buffer with appropriate size to hold [0, R]
422+ payload_buffer.resize (payload_length);
423+ payload_buffer[0 ] = 0 ;
424+ }
425+
426+ // serialize ROS message to payload [0/1, -, R]
427+ ros::serialization::OStream msg_stream (&payload_buffer[msg_offset],
428+ msg_length);
429+ ros::serialization::serialize (msg_stream, *ros_msg);
430+
431+ // inject timestamp as final step
418432 if (ros2mqtt.stamped ) {
419433
420- // serialize current timestamp
434+ // take current timestamp
421435 ros::WallTime stamp_wall = ros::WallTime::now ();
422436 ros::Time stamp (stamp_wall.sec , stamp_wall.nsec );
423437 if (stamp.isZero ())
424438 NODELET_WARN (
425439 " Injected ROS time 0 into MQTT payload on topic %s, is a ROS clock "
426440 " running?" ,
427441 ros2mqtt.mqtt .topic .c_str ());
428- uint32_t stamp_length = ros::serialization::serializationLength (stamp);
429- std::vector<uint8_t > stamp_buffer;
430- stamp_buffer.resize (stamp_length);
431- ros::serialization::OStream stamp_stream (stamp_buffer.data (), stamp_length);
432- ros::serialization::serialize (stamp_stream, stamp);
433442
434- // inject timestamp into payload
435- payload_length += stamp_length;
436- msg_offset += stamp_length;
437- payload_buffer.resize (payload_length);
438- payload_buffer[0 ] = 1 ;
439- payload_buffer.insert (payload_buffer.begin () + 1 ,
440- std::make_move_iterator (stamp_buffer.begin ()),
441- std::make_move_iterator (stamp_buffer.end ()));
442-
443- } else {
444-
445- payload_buffer.resize (payload_length);
446- payload_buffer[0 ] = 0 ;
443+ // serialize timestamp to payload [1, S, R]
444+ ros::serialization::OStream stamp_stream (&payload_buffer[1 ], stamp_length);
445+ ros::serialization::serialize (stamp_stream, stamp);
447446 }
448- // add ROS message to payload
449- payload_buffer.insert (payload_buffer.begin () + msg_offset,
450- std::make_move_iterator (msg_buffer.begin ()),
451- std::make_move_iterator (msg_buffer.end ()));
452447
453448 // send ROS message to MQTT broker
454449 mqtt_topic = ros2mqtt.mqtt .topic ;
@@ -468,7 +463,8 @@ void MqttClient::ros2mqtt(const topic_tools::ShapeShifter::ConstPtr& ros_msg,
468463}
469464
470465
471- void MqttClient::mqtt2ros (mqtt::const_message_ptr mqtt_msg) {
466+ void MqttClient::mqtt2ros (mqtt::const_message_ptr mqtt_msg,
467+ const ros::WallTime& arrival_stamp) {
472468
473469 std::string mqtt_topic = mqtt_msg->get_topic ();
474470 Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic];
@@ -486,20 +482,18 @@ void MqttClient::mqtt2ros(mqtt::const_message_ptr mqtt_msg) {
486482 // if stamped, compute latency
487483 if (stamped) {
488484
489- // copy timestamp from MQTT message to buffer
490- ros::Time stamp;
491- uint32_t stamp_length = ros::serialization::serializationLength (stamp);
492- std::vector<uint8_t > stamp_buffer;
493- stamp_buffer.resize (stamp_length);
494- std::memcpy (stamp_buffer.data (), &(payload[1 ]), stamp_length);
485+ // create ROS message buffer on top of MQTT message payload
486+ char * non_const_payload = const_cast <char *>(&payload[1 ]);
487+ uint8_t * stamp_buffer = reinterpret_cast <uint8_t *>(non_const_payload);
495488
496489 // deserialize stamp
497- ros::serialization::IStream stamp_stream (stamp_buffer.data (), stamp_length);
490+ ros::Time stamp;
491+ uint32_t stamp_length = ros::serialization::serializationLength (stamp);
492+ ros::serialization::IStream stamp_stream (stamp_buffer, stamp_length);
498493 ros::serialization::deserialize (stamp_stream, stamp);
499494
500495 // compute ROS2MQTT latency
501- ros::WallTime now_wall = ros::WallTime::now ();
502- ros::Time now (now_wall.sec , now_wall.nsec );
496+ ros::Time now (arrival_stamp.sec , arrival_stamp.nsec );
503497 if (now.isZero ())
504498 NODELET_WARN (
505499 " Cannot compute latency for MQTT topic %s when ROS time is 0, is a ROS "
@@ -521,17 +515,16 @@ void MqttClient::mqtt2ros(mqtt::const_message_ptr mqtt_msg) {
521515 msg_offset += stamp_length;
522516 }
523517
524- // copy ROS message from MQTT message to buffer
525- std::vector< uint8_t > msg_buffer ;
526- msg_buffer. resize (msg_length );
527- std::memcpy (msg_buffer. data (), &(payload[msg_offset]) , msg_length);
518+ // create ROS message buffer on top of MQTT message payload
519+ char * non_const_payload = const_cast < char *>(&payload[msg_offset]) ;
520+ uint8_t * msg_buffer = reinterpret_cast < uint8_t *>(non_const_payload );
521+ ros::serialization::OStream msg_stream (msg_buffer , msg_length);
528522
529523 // publish via ShapeShifter
530524 NODELET_DEBUG (
531525 " Sending ROS message of type '%s' from MQTT broker to ROS topic '%s' ..." ,
532526 mqtt2ros.ros .shape_shifter .getDataType ().c_str (),
533527 mqtt2ros.ros .topic .c_str ());
534- ros::serialization::OStream msg_stream (msg_buffer.data (), msg_length);
535528 mqtt2ros.ros .shape_shifter .read (msg_stream);
536529 mqtt2ros.ros .publisher .publish (mqtt2ros.ros .shape_shifter );
537530}
@@ -581,6 +574,9 @@ bool MqttClient::isConnectedService(IsConnected::Request& request,
581574
582575void MqttClient::message_arrived (mqtt::const_message_ptr mqtt_msg) {
583576
577+ // instantly take arrival timestamp
578+ ros::WallTime arrival_stamp = ros::WallTime::now ();
579+
584580 std::string mqtt_topic = mqtt_msg->get_topic ();
585581 NODELET_DEBUG (" Received MQTT message on topic '%s'" , mqtt_topic.c_str ());
586582 auto & payload = mqtt_msg->get_payload_ref ();
@@ -590,14 +586,13 @@ void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) {
590586 mqtt_topic.find (kRosMsgTypeMqttTopicPrefix ) != std::string::npos;
591587 if (msg_contains_ros_msg_type) {
592588
593- // copy ROS message type from MQTT message to buffer
594- RosMsgType ros_msg_type;
595- std::vector<uint8_t > msg_type_buffer;
596- msg_type_buffer.resize (payload_length);
597- std::memcpy (msg_type_buffer.data (), &(payload[0 ]), payload_length);
589+ // create ROS message buffer on top of MQTT message payload
590+ char * non_const_payload = const_cast <char *>(&payload[0 ]);
591+ uint8_t * msg_type_buffer = reinterpret_cast <uint8_t *>(non_const_payload);
598592
599593 // deserialize ROS message type
600- ros::serialization::IStream msg_type_stream (msg_type_buffer.data (),
594+ RosMsgType ros_msg_type;
595+ ros::serialization::IStream msg_type_stream (msg_type_buffer,
601596 payload_length);
602597 ros::serialization::deserialize (msg_type_stream, ros_msg_type);
603598
@@ -632,7 +627,7 @@ void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) {
632627
633628 // publish ROS message, if publisher initialized
634629 if (!mqtt2ros_[mqtt_topic].ros .publisher .getTopic ().empty ()) {
635- mqtt2ros (mqtt_msg);
630+ mqtt2ros (mqtt_msg, arrival_stamp );
636631 } else {
637632 NODELET_WARN (
638633 " ROS publisher for data from MQTT topic '%s' is not yet initialized: "
0 commit comments