diff --git a/.gitignore b/.gitignore index 83d560ed..17e5da2d 100644 --- a/.gitignore +++ b/.gitignore @@ -39,6 +39,7 @@ qtcreator-* /planning/src *~ +*.orig # Emacs .#* @@ -46,3 +47,5 @@ qtcreator-* # Catkin custom files CATKIN_IGNORE test_generated + +.idea/* \ No newline at end of file diff --git a/atf_core/src/atf_core/bagfile_helper.py b/atf_core/src/atf_core/bagfile_helper.py index 404aaa46..ad9125b2 100644 --- a/atf_core/src/atf_core/bagfile_helper.py +++ b/atf_core/src/atf_core/bagfile_helper.py @@ -1,6 +1,6 @@ #!/usr/bin/env python class BagfileWriter: - def __init__(self, bagfile, write_lock): + def __init__(self,test_config, bagfile, write_lock): """ Class for writing data to a rosbag file. :param bagfile: The bagfile to which the data should be written. diff --git a/atf_core/src/atf_core/recorder.py b/atf_core/src/atf_core/recorder.py index 4c99c66f..c000b603 100644 --- a/atf_core/src/atf_core/recorder.py +++ b/atf_core/src/atf_core/recorder.py @@ -34,14 +34,14 @@ def __init__(self, config, testblock_list): # create bag file writer handle self.lock_write = Lock() self.bag = rosbag.Bag(config["bagfile_output"] + self.config["test_name"] + ".bag", 'w') - self.bag_file_writer = atf_core.BagfileWriter(self.bag, self.lock_write) + self.bag_file_writer = atf_core.BagfileWriter(self.config, self.bag, self.lock_write) # Init metric recorder self.recorder_plugin_list = [] if len(recorder_config) > 0: for value in recorder_config.values(): #print "value=", value - self.recorder_plugin_list.append(getattr(atf_recorder_plugins, value)(self.lock_write, + self.recorder_plugin_list.append(getattr(atf_recorder_plugins, value)(self.config, self.lock_write, self.bag_file_writer)) self.topic_pipeline = [] @@ -128,8 +128,9 @@ def create_subscriber_callback(self, event): if topic not in self.subscriber: try: msg_class, _, _ = rostopic.get_topic_class(topic) - rospy.Subscriber(topic, msg_class, self.global_topic_callback, callback_args=topic) + rospy.Subscriber(topic, msg_class, self.global_topic_callback, callback_args=topic, queue_size=10) self.subscriber.append(topic) + print "new subscriber for: ", topic, "message class:", msg_class, " subscribers: ", self.subscriber except Exception: pass diff --git a/atf_metrics/config/metrics.yaml b/atf_metrics/config/metrics.yaml index 0eb8afb6..9128d860 100644 --- a/atf_metrics/config/metrics.yaml +++ b/atf_metrics/config/metrics.yaml @@ -2,13 +2,23 @@ time: handler: CalculateTimeParamHandler path_length: handler: CalculatePathLengthParamHandler -resources: - handler: CalculateResourcesParamHandler +distance: + handler: CalculateDistanceParamHandler +resource_cpu: + handler: CalculateResourcesCpuParamHandler +resource_mem: + handler: CalculateResourcesMemParamHandler +resource_io: + handler: CalculateResourcesIOParamHandler +resource_network: + handler: CalculateResourcesNetworkParamHandler obstacle_distance: handler: CalculateDistanceToObstaclesParamHandler publish_rate: handler: CalculatePublishRateParamHandler interface: - handler: CalculateInterfaceParamHandler + handler: CalculateInterfaceParamHandler +localization: + handler: CheckLocalizationParamHandler # example: # handler: ExampleParamHandler \ No newline at end of file diff --git a/atf_metrics/src/atf_metrics/__init__.py b/atf_metrics/src/atf_metrics/__init__.py index ad617882..08555650 100644 --- a/atf_metrics/src/atf_metrics/__init__.py +++ b/atf_metrics/src/atf_metrics/__init__.py @@ -1,6 +1,9 @@ from atf_metrics.calculate_path_length import CalculatePathLength, CalculatePathLengthParamHandler from atf_metrics.calculate_time import CalculateTime, CalculateTimeParamHandler -from atf_metrics.calculate_resources import CalculateResources, CalculateResourcesParamHandler +from atf_metrics.calculate_resources_cpu import CalculateResourcesCpu, CalculateResourcesCpuParamHandler +from atf_metrics.calculate_resources_mem import CalculateResourcesMem, CalculateResourcesMemParamHandler +from atf_metrics.calculate_resources_io import CalculateResourcesIO, CalculateResourcesIOParamHandler +from atf_metrics.calculate_resources_network import CalculateResourcesNetwork, CalculateResourcesNetworkParamHandler from atf_metrics.calculate_distance_to_obstacles import CalculateDistanceToObstacles, CalculateDistanceToObstaclesParamHandler from atf_metrics.calculate_publish_rate import CalculatePublishRate, CalculatePublishRateParamHandler from atf_metrics.calculate_interface import CalculateInterface, CalculateInterfaceParamHandler diff --git a/atf_metrics/src/atf_metrics/calculate_distance.py b/atf_metrics/src/atf_metrics/calculate_distance.py new file mode 100644 index 00000000..b70a84cf --- /dev/null +++ b/atf_metrics/src/atf_metrics/calculate_distance.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python +import rospy +import tf +import math + +class CalculateDistanceParamHandler: + def __init__(self): + """ + Class for returning the corresponding metric class with the given parameter. + """ + pass + + def parse_parameter(self, testblock_name, params): + """ + Method that returns the metric method with the given parameter. + :param params: Parameter + """ + metrics = [] + if type(params) is not list: + rospy.logerr("metric config not a list") + return False + + for metric in params: + # check for optional parameters + try: + groundtruth = metric["groundtruth"] + groundtruth_epsilon = metric["groundtruth_epsilon"] + except (TypeError, KeyError): + rospy.logwarn("No groundtruth parameters given, skipping groundtruth evaluation for metric 'distance' in testblock '%s'", testblock_name) + groundtruth = None + groundtruth_epsilon = None + metrics.append(CalculateDistance(metric["root_frame"], metric["measured_frame"], groundtruth, groundtruth_epsilon)) + return metrics + +class CalculateDistance: + def __init__(self, root_frame, measured_frame, groundtruth, groundtruth_epsilon): + """ + Class for calculating the distance to a given root frame. + The tf data is sent over the tf topic given in the robot_config.yaml. + :param root_frame: name of the first frame + :type root_frame: string + :param measured_frame: name of the second frame. The distance will be measured in relation to the root_frame. + :type measured_frame: string + """ + + self.active = False + self.root_frame = root_frame + self.measured_frame = measured_frame + self.distance = 0.0 + self.distances = 0.0 + self.dist_count = 0 + self.tf_sampling_freq = 20.0 # Hz + self.groundtruth = groundtruth + self.groundtruth_epsilon = groundtruth_epsilon + self.finished = False + + self.listener = tf.TransformListener() + + rospy.Timer(rospy.Duration.from_sec(1 / self.tf_sampling_freq), self.record_tf) + + def start(self, timestamp): + self.active = True + + def stop(self, timestamp): + self.active = False + self.finished = True + + def pause(self, timestamp): + self.active = False + self.first_value = True + + def purge(self, timestamp): + pass + + def record_tf(self, event): + if self.active: + try: + self.listener.waitForTransform(self.root_frame, + self.measured_frame, + rospy.Time(0), + rospy.Duration.from_sec(1 / (2*self.tf_sampling_freq))) + (trans, rot) = self.listener.lookupTransform(self.root_frame, self.measured_frame, rospy.Time(0)) + + except (tf.Exception, tf.LookupException, tf.ConnectivityException): + #rospy.logwarn(e) + pass + else: + self.distances = self.distances + math.sqrt(trans[0]**2 + trans[1]**2) + self.dist_count = self.dist_count + 1 + + + def get_result(self): + groundtruth_result = None + details = {"root_frame": self.root_frame, "measured_frame": self.measured_frame} + if self.finished: + #print "Distances: "+str(self.distances)+" Dist Count: "+str(self.dist_count) + data = round(self.distances/self.dist_count, 3) + if self.groundtruth != None and self.groundtruth_epsilon != None: + if math.fabs(self.groundtruth - data) <= self.groundtruth_epsilon: + groundtruth_result = True + else: + groundtruth_result = False + return "distance", data, groundtruth_result, self.groundtruth, self.groundtruth_epsilon, details + else: + return False diff --git a/atf_metrics/src/atf_metrics/calculate_resources.py b/atf_metrics/src/atf_metrics/calculate_resources.py deleted file mode 100644 index 2cb7a02e..00000000 --- a/atf_metrics/src/atf_metrics/calculate_resources.py +++ /dev/null @@ -1,123 +0,0 @@ -#!/usr/bin/env python -import numpy -import rospy - -from atf_msgs.msg import Resources, IO, Network - - -class CalculateResourcesParamHandler: - def __init__(self): - """ - Class for returning the corresponding metric class with the given parameter. - """ - self.params = [] - - def parse_parameter(self, params): - """ - Method that returns the metric method with the given parameter. - :param params: Parameter - """ - self.params = params - - metrics = CalculateResources(self.params) - - return metrics - - -class CalculateResources: - def __init__(self, resources): - """ - Class for calculating the average resource workload and writing the current resource data. - The resource data is sent over the topic "/testing/Resources". - :param resources: a dictionary containing the names of the resources and a list with the names of the nodes. - Example: {"cpu":[move_group], "mem": [move_group]} - :type resources: dict - """ - - self.active = False - self.resources = resources - self.node_data = {} - self.size_io = len(IO.__slots__) - self.size_network = len(Network.__slots__) - self.finished = False - - # Sort resources after nodes - for resource in self.resources: - for node in self.resources[resource]: - if node not in self.node_data: - self.node_data[node] = {resource: {"data": [], "average": [], "min": [], "max": []}} - elif resource not in self.node_data[node]: - self.node_data[node].update({resource: {"data": [], "average": [], "min": [], "max": []}}) - - rospy.Subscriber("/atf/resources", Resources, self.process_resource_data, queue_size=1) - - def start(self): - self.active = True - - def stop(self): - self.active = False - self.finished = True - - def pause(self): - self.active = False - - @staticmethod - def purge(): - pass - - def process_resource_data(self, msg): - if self.active: - for node in msg.nodes: - try: - for resource in self.node_data[node.node_name]: - if resource == "cpu": - self.node_data[node.node_name][resource]["data"].append(round(node.cpu, 2)) - elif resource == "mem": - self.node_data[node.node_name][resource]["data"].append(round(node.memory, 2)) - elif resource == "io": - if len(self.node_data[node.node_name][resource]["data"]) == 0: - for i in xrange(0, self.size_io): - self.node_data[node.node_name][resource]["data"].append([]) - self.node_data[node.node_name][resource]["data"][0].append(round(node.io.read_count, 2)) - self.node_data[node.node_name][resource]["data"][1].append(round(node.io.write_count, 2)) - self.node_data[node.node_name][resource]["data"][2].append(round(node.io.read_bytes, 2)) - self.node_data[node.node_name][resource]["data"][3].append(round(node.io.write_bytes, 2)) - elif resource == "network": - if len(self.node_data[node.node_name][resource]["data"]) == 0: - for i in xrange(0, self.size_network): - self.node_data[node.node_name][resource]["data"].append([]) - self.node_data[node.node_name][resource]["data"][0].append(round(node.network.bytes_sent, - 2)) - self.node_data[node.node_name][resource]["data"][1].append(round(node.network.bytes_recv, - 2)) - self.node_data[node.node_name][resource]["data"][2].append(round(node.network.packets_sent, - 2)) - self.node_data[node.node_name][resource]["data"][3].append(round(node.network.packets_recv, - 2)) - self.node_data[node.node_name][resource]["data"][4].append(round(node.network.errin, 2)) - self.node_data[node.node_name][resource]["data"][5].append(round(node.network.errout, 2)) - self.node_data[node.node_name][resource]["data"][6].append(round(node.network.dropin, 2)) - self.node_data[node.node_name][resource]["data"][7].append(round(node.network.dropout, 2)) - except KeyError: - pass - - def get_result(self): - if self.finished: - for node in self.node_data: - for res in self.node_data[node]: - if len(self.node_data[node][res]["data"]) != 0: - if res == "io" or res == "network": - for values in self.node_data[node][res]["data"]: - self.node_data[node][res]["average"].append(float(round(numpy.mean(values), 2))) - self.node_data[node][res]["min"].append(float(round(min(values), 2))) - self.node_data[node][res]["max"].append(float(round(max(values), 2))) - else: - self.node_data[node][res]["average"] = float(round(numpy.mean(self.node_data[node][res] - ["data"]), 2)) - self.node_data[node][res]["min"] = float(round(min(self.node_data[node][res]["data"]), 2)) - self.node_data[node][res]["max"] = float(round(max(self.node_data[node][res]["data"]), 2)) - del self.node_data[node][res]["data"] - - return "resources", self.node_data - else: - return False diff --git a/atf_metrics/src/atf_metrics/calculate_resources_cpu.py b/atf_metrics/src/atf_metrics/calculate_resources_cpu.py new file mode 100644 index 00000000..b1d69efd --- /dev/null +++ b/atf_metrics/src/atf_metrics/calculate_resources_cpu.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python +import numpy +import rospy +import math + +from atf_msgs.msg import Resources, IO, Network + + +class CalculateResourcesCpuParamHandler: + def __init__(self): + """ + Class for returning the corresponding metric class with the given parameter. + """ + self.params = [] + + def parse_parameter(self, testblock_name, params): + """ + Method that returns the metric method with the given parameter. + :param params: Parameter + """ + if not isinstance(params, list): + rospy.logerr("metric config not a list") + return False + metrics = [] + print "params:", params + for metric in params: + # check for optional parameters + try: + groundtruth = metric["groundtruth"] + groundtruth_epsilon = metric["groundtruth_epsilon"] + print "groundtruth", groundtruth, "groundtruth_epsilon", groundtruth_epsilon + if 'groundtruth' in metric: + del metric['groundtruth'] + if 'groundtruth_epsilon' in metric: + del metric['groundtruth_epsilon'] + except (TypeError, KeyError): + rospy.logwarn( + "No groundtruth parameters given, skipping groundtruth evaluation for metric 'resources' in testblock '%s'", + testblock_name) + groundtruth = None + groundtruth_epsilon = None + print "metric:", metric + metrics.append(CalculateResourcesCpu(metric["nodes"], groundtruth, groundtruth_epsilon)) + return metrics + + +class CalculateResourcesCpu: + def __init__(self, nodes, groundtruth, groundtruth_epsilon): + """ + Class for calculating the average cpu resource workload and writing the current resource data. + The resource data is sent over the topic "/testing/Resources". + :param nodes: a list with the names of the nodes. + """ + + self.active = False + self.resource = "cpu" + self.groundtruth = groundtruth + self.groundtruth_epsilon = groundtruth_epsilon + self.node_data = {} + self.size_io = len(IO.__slots__) + self.size_network = len(Network.__slots__) + self.finished = False + + # Sort resources after nodes + for node in nodes: + if node not in self.node_data: + self.node_data[node] = {self.resource: {"data": [], "average": [], "min": [], "max": []}} + rospy.Subscriber("/atf/resources", Resources, self.process_resource_data, queue_size=1) + + def start(self, timestamp): + self.active = True + + def stop(self, timestamp): + self.active = False + self.finished = True + + def pause(self, timestamp): + self.active = False + + def purge(self, timestamp): + pass + + def process_resource_data(self, msg): + if self.active: + for node in msg.nodes: + try: + for resource in self.node_data[node.node_name]: + #print "nodes:", msg.nodes, "\n node data:", self.node_data, "\n resource", resource + if resource == "cpu": + self.node_data[node.node_name][resource]["data"].append(round(node.cpu, 2)) + except KeyError: + pass + + def get_result(self): + groundtruth_result = None + details = {"sum of nodes":[]} + average_sum = 0.0 + + if self.finished: + for node in self.node_data: + #print " node:", node + for res in self.node_data[node]: + #print "res", res + if len(self.node_data[node][res]["data"]) != 0: + self.node_data[node][res]["average"] = float(round(numpy.mean(self.node_data[node][res] + ["data"]), 2)) + self.node_data[node][res]["min"] = float(round(min(self.node_data[node][res]["data"]), 2)) + self.node_data[node][res]["max"] = float(round(max(self.node_data[node][res]["data"]), 2)) + average_sum += float(round(numpy.mean(self.node_data[node][res]["data"]), 2)) + print "average sum:", average_sum + del self.node_data[node][res]["data"] + + details["sum of nodes"].append(node) + + #print "groundtruthes:", self.groundtruth, self.groundtruth_epsilon, "\n average:", self.node_data[node][res]["average"] + if self.groundtruth != None and self.groundtruth_epsilon != None: + for node in self.node_data: + + #print "average sum:check", average_sum + if math.fabs(self.groundtruth - average_sum) <= self.groundtruth_epsilon: + groundtruth_result = True + else: + groundtruth_result = False + + return "resources_cpu", round(average_sum, 3), groundtruth_result, self.groundtruth, self.groundtruth_epsilon, details + else: + return False diff --git a/atf_metrics/src/atf_metrics/calculate_resources_io.py b/atf_metrics/src/atf_metrics/calculate_resources_io.py new file mode 100644 index 00000000..c8e9aa33 --- /dev/null +++ b/atf_metrics/src/atf_metrics/calculate_resources_io.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python +import numpy +import rospy +import math + +from atf_msgs.msg import Resources, IO, Network +from operator import add + + +class CalculateResourcesIOParamHandler: + def __init__(self): + """ + Class for returning the corresponding metric class with the given parameter. + """ + self.params = [] + + def parse_parameter(self, testblock_name, params): + """ + Method that returns the metric method with the given parameter. + :param params: Parameter + """ + if not isinstance(params, list): + rospy.logerr("metric config not a list") + return False + metrics = [] + print "params:", params + for metric in params: + # check for optional parameters + try: + groundtruth = metric["groundtruth"] + groundtruth_epsilon = metric["groundtruth_epsilon"] + print "groundtruth", groundtruth, "groundtruth_epsilon", groundtruth_epsilon + if 'groundtruth' in metric: + del metric['groundtruth'] + if 'groundtruth_epsilon' in metric: + del metric['groundtruth_epsilon'] + except (TypeError, KeyError): + rospy.logwarn( + "No groundtruth parameters given, skipping groundtruth evaluation for metric 'resources' in testblock '%s'", + testblock_name) + groundtruth = None + groundtruth_epsilon = None + print "metric:", metric + metrics.append(CalculateResourcesIO(metric["nodes"], groundtruth, groundtruth_epsilon)) + return metrics + + +class CalculateResourcesIO: + def __init__(self, nodes, groundtruth, groundtruth_epsilon): + """ + Class for calculating the average disk IO resource workload and writing the current resource data. + The resource data is sent over the topic "/testing/Resources". + :param nodes: a list with the names of the nodes. + """ + + self.active = False + self.resource = "io" + self.groundtruth = groundtruth + self.groundtruth_epsilon = groundtruth_epsilon + self.node_data = {} + self.size_io = len(IO.__slots__) + self.size_network = len(Network.__slots__) + self.finished = False + + # Sort resources after nodes + for node in nodes: + if node not in self.node_data: + self.node_data[node] = {self.resource: {"data": [], "average": [], "min": [], "max": []}} + rospy.Subscriber("/atf/resources", Resources, self.process_resource_data, queue_size=1) + + def start(self, timestamp): + self.active = True + + def stop(self, timestamp): + self.active = False + self.finished = True + + def pause(self, timestamp): + self.active = False + + def purge(self, timestamp): + pass + + def process_resource_data(self, msg): + if self.active: + for node in msg.nodes: + try: + for resource in self.node_data[node.node_name]: + #print "nodes:", msg.nodes, "\n node data:", self.node_data, "\n resource", resource + if resource == "io": + if len(self.node_data[node.node_name][resource]["data"]) == 0: + for i in xrange(0, self.size_io): + self.node_data[node.node_name][resource]["data"].append([]) + self.node_data[node.node_name][resource]["data"][0].append(round(node.io.read_count, 2)) + self.node_data[node.node_name][resource]["data"][1].append(round(node.io.write_count, 2)) + self.node_data[node.node_name][resource]["data"][2].append(round(node.io.read_bytes, 2)) + self.node_data[node.node_name][resource]["data"][3].append(round(node.io.write_bytes, 2)) + except KeyError: + pass + + def get_result(self): + groundtruth_result = None + details = {"sum of read and write bytes from nodes":[]} + average_sum = 0.0 + + if self.finished: + for node in self.node_data: + #print " node:", node + for res in self.node_data[node]: + #print "res", res + if len(self.node_data[node][res]["data"]) != 0: + self.node_data[node][res]["average"] = float(round(numpy.mean( + map(add, self.node_data[node][res]["data"][2], self.node_data[node][res]["data"][3])), 2)) + self.node_data[node][res]["min"] = float(round( + min(map(add, self.node_data[node][res]["data"][2], self.node_data[node][res]["data"][3])), + 2)) + self.node_data[node][res]["max"] = float(round( + max(map(add, self.node_data[node][res]["data"][2], self.node_data[node][res]["data"][3])), + 2)) + average_sum += float(round(numpy.mean(self.node_data[node][res]["average"]), 2)) + print "average sum:", average_sum + del self.node_data[node][res]["data"] + + details["sum of read and write bytes from nodes"].append(node) + if self.groundtruth != None and self.groundtruth_epsilon != None: + for node in self.node_data: + + #print "average sum:check", average_sum + if math.fabs(self.groundtruth - average_sum) <= self.groundtruth_epsilon: + groundtruth_result = True + else: + groundtruth_result = False + + return "resources_io", round(average_sum, 3), groundtruth_result, self.groundtruth, self.groundtruth_epsilon, details + else: + return False diff --git a/atf_metrics/src/atf_metrics/calculate_resources_mem.py b/atf_metrics/src/atf_metrics/calculate_resources_mem.py new file mode 100644 index 00000000..dde4f4fe --- /dev/null +++ b/atf_metrics/src/atf_metrics/calculate_resources_mem.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python +import numpy +import rospy +import math + +from atf_msgs.msg import Resources, IO, Network + + +class CalculateResourcesMemParamHandler: + def __init__(self): + """ + Class for returning the corresponding metric class with the given parameter. + """ + self.params = [] + + def parse_parameter(self, testblock_name, params): + """ + Method that returns the metric method with the given parameter. + :param params: Parameter + """ + if not isinstance(params, list): + rospy.logerr("metric config not a list") + return False + metrics = [] + print "params:", params + for metric in params: + # check for optional parameters + try: + groundtruth = metric["groundtruth"] + groundtruth_epsilon = metric["groundtruth_epsilon"] + print "groundtruth", groundtruth, "groundtruth_epsilon", groundtruth_epsilon + if 'groundtruth' in metric: + del metric['groundtruth'] + if 'groundtruth_epsilon' in metric: + del metric['groundtruth_epsilon'] + except (TypeError, KeyError): + rospy.logwarn( + "No groundtruth parameters given, skipping groundtruth evaluation for metric 'resources' in testblock '%s'", + testblock_name) + groundtruth = None + groundtruth_epsilon = None + print "metric:", metric + metrics.append(CalculateResourcesMem(metric["nodes"], groundtruth, groundtruth_epsilon)) + return metrics + + +class CalculateResourcesMem: + def __init__(self, nodes, groundtruth, groundtruth_epsilon): + """ + Class for calculating the average memory resource workload and writing the current resource data. + The resource data is sent over the topic "/testing/Resources". + :param nodes: a list with the names of the nodes. + """ + + self.active = False + self.resource = "mem" + self.groundtruth = groundtruth + self.groundtruth_epsilon = groundtruth_epsilon + self.node_data = {} + self.size_io = len(IO.__slots__) + self.size_network = len(Network.__slots__) + self.finished = False + + # Sort resources after nodes + for node in nodes: + if node not in self.node_data: + self.node_data[node] = {self.resource: {"data": [], "average": [], "min": [], "max": []}} + rospy.Subscriber("/atf/resources", Resources, self.process_resource_data, queue_size=1) + + def start(self, timestamp): + self.active = True + + def stop(self, timestamp): + self.active = False + self.finished = True + + def pause(self, timestamp): + self.active = False + + def purge(self, timestamp): + pass + + def process_resource_data(self, msg): + if self.active: + for node in msg.nodes: + try: + for resource in self.node_data[node.node_name]: + #print "nodes:", msg.nodes, "\n node data:", self.node_data, "\n resource", resource + if resource == "mem": + self.node_data[node.node_name][resource]["data"].append(round(node.memory, 2)) + except KeyError: + pass + + def get_result(self): + groundtruth_result = None + details = {"sum of nodes":[]} + average_sum = 0.0 + + if self.finished: + for node in self.node_data: + #print " node:", node + for res in self.node_data[node]: + #print "res", res + if len(self.node_data[node][res]["data"]) != 0: + self.node_data[node][res]["average"] = float(round(numpy.mean(self.node_data[node][res] + ["data"]), 2)) + self.node_data[node][res]["min"] = float(round(min(self.node_data[node][res]["data"]), 2)) + self.node_data[node][res]["max"] = float(round(max(self.node_data[node][res]["data"]), 2)) + average_sum += float(round(numpy.mean(self.node_data[node][res]["data"]), 2)) + print "average sum:", average_sum + del self.node_data[node][res]["data"] + + details["sum of nodes"].append(node) + + if self.groundtruth != None and self.groundtruth_epsilon != None: + for node in self.node_data: + + #print "average sum:check", average_sum + if math.fabs(self.groundtruth - average_sum) <= self.groundtruth_epsilon: + groundtruth_result = True + else: + groundtruth_result = False + + return "resources_mem", round(average_sum, 3), groundtruth_result, self.groundtruth, self.groundtruth_epsilon, details + else: + return False diff --git a/atf_metrics/src/atf_metrics/calculate_resources_network.py b/atf_metrics/src/atf_metrics/calculate_resources_network.py new file mode 100644 index 00000000..a3059431 --- /dev/null +++ b/atf_metrics/src/atf_metrics/calculate_resources_network.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python +import numpy +import rospy +import math + +from atf_msgs.msg import Resources, IO, Network +from operator import add + +class CalculateResourcesNetworkParamHandler: + def __init__(self): + """ + Class for returning the corresponding metric class with the given parameter. + """ + self.params = [] + + def parse_parameter(self, testblock_name, params): + """ + Method that returns the metric method with the given parameter. + :param params: Parameter + """ + if not isinstance(params, list): + rospy.logerr("metric config not a list") + return False + metrics = [] + print "params:", params + for metric in params: + # check for optional parameters + try: + groundtruth = metric["groundtruth"] + groundtruth_epsilon = metric["groundtruth_epsilon"] + print "groundtruth", groundtruth, "groundtruth_epsilon", groundtruth_epsilon + if 'groundtruth' in metric: + del metric['groundtruth'] + if 'groundtruth_epsilon' in metric: + del metric['groundtruth_epsilon'] + except (TypeError, KeyError): + rospy.logwarn( + "No groundtruth parameters given, skipping groundtruth evaluation for metric 'resources' in testblock '%s'", + testblock_name) + groundtruth = None + groundtruth_epsilon = None + print "metric:", metric + metrics.append(CalculateResourcesNetwork(metric["nodes"], groundtruth, groundtruth_epsilon)) + return metrics + + +class CalculateResourcesNetwork: + def __init__(self, nodes, groundtruth, groundtruth_epsilon): + """ + Class for calculating the average network resource workload and writing the current resource data. + The resource data is sent over the topic "/testing/Resources". + :param nodes: a list with the names of the nodes. + """ + + self.active = False + self.resource = "network" + self.groundtruth = groundtruth + self.groundtruth_epsilon = groundtruth_epsilon + self.node_data = {} + self.size_io = len(IO.__slots__) + self.size_network = len(Network.__slots__) + self.finished = False + + # Sort resources after nodes + for node in nodes: + if node not in self.node_data: + self.node_data[node] = {self.resource: {"data": [], "average": [], "min": [], "max": []}} + rospy.Subscriber("/atf/resources", Resources, self.process_resource_data, queue_size=1) + + def start(self, timestamp): + self.active = True + + def stop(self, timestamp): + self.active = False + self.finished = True + + def pause(self, timestamp): + self.active = False + + def purge(self, timestamp): + pass + + def process_resource_data(self, msg): + if self.active: + for node in msg.nodes: + try: + for resource in self.node_data[node.node_name]: + #print "nodes:", msg.nodes, "\n node data:", self.node_data, "\n resource", resource + if resource == "network": + if len(self.node_data[node.node_name][resource]["data"]) == 0: + for i in xrange(0, self.size_network): + self.node_data[node.node_name][resource]["data"].append([]) + self.node_data[node.node_name][resource]["data"][0].append(round(node.network.bytes_sent, + 2)) + self.node_data[node.node_name][resource]["data"][1].append(round(node.network.bytes_recv, + 2)) + self.node_data[node.node_name][resource]["data"][2].append(round(node.network.packets_sent, + 2)) + self.node_data[node.node_name][resource]["data"][3].append(round(node.network.packets_recv, + 2)) + self.node_data[node.node_name][resource]["data"][4].append(round(node.network.errin, 2)) + self.node_data[node.node_name][resource]["data"][5].append(round(node.network.errout, 2)) + self.node_data[node.node_name][resource]["data"][6].append(round(node.network.dropin, 2)) + self.node_data[node.node_name][resource]["data"][7].append(round(node.network.dropout, 2)) + except KeyError: + pass + + def get_result(self): + groundtruth_result = None + details = {"sum of received and sent bytes from nodes":[]} + average_sum = 0.0 + + if self.finished: + for node in self.node_data: + #print " node:", node + for res in self.node_data[node]: + #print "res", res + if len(self.node_data[node][res]["data"]) != 0: + self.node_data[node][res]["average"] = float(round(numpy.mean( + map(add, self.node_data[node][res]["data"][0], self.node_data[node][res]["data"][1])), 2)) + self.node_data[node][res]["min"] = float(round( + min(map(add, self.node_data[node][res]["data"][0], self.node_data[node][res]["data"][1])), + 2)) + self.node_data[node][res]["max"] = float(round( + max(map(add, self.node_data[node][res]["data"][0], self.node_data[node][res]["data"][1])), + 2)) + average_sum += float(round(numpy.mean(self.node_data[node][res]["average"]), 2)) + print "average sum:", average_sum + del self.node_data[node][res]["data"] + + details["sum of received and sent bytes from nodes"].append(node) + + if self.groundtruth != None and self.groundtruth_epsilon != None: + for node in self.node_data: + + #print "average sum:check", average_sum + if math.fabs(self.groundtruth - average_sum) <= self.groundtruth_epsilon: + groundtruth_result = True + else: + groundtruth_result = False + + return "resources_network", round(average_sum, 3), groundtruth_result, self.groundtruth, self.groundtruth_epsilon, details + else: + return False diff --git a/atf_metrics/src/atf_metrics/check_localization.py b/atf_metrics/src/atf_metrics/check_localization.py new file mode 100644 index 00000000..feb4e78a --- /dev/null +++ b/atf_metrics/src/atf_metrics/check_localization.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python +import rospy +import tf +import math + +class CheckLocalizationParamHandler: + def __init__(self): + """ + Class for returning the corresponding metric class with the given parameter. + """ + pass + + def parse_parameter(self, testblock_name, params): + """ + Method that returns the metric method with the given parameter. + :param params: Parameter + """ + metrics = [] + if type(params) is not list: + rospy.logerr("metric config not a list") + return False + + for metric in params: + # check for optional parameters + try: + groundtruth = metric["groundtruth"] + groundtruth_epsilon = metric["groundtruth_epsilon"] + except (TypeError, KeyError): + rospy.logwarn("No groundtruth parameters given, skipping groundtruth evaluation for metric 'distance' in testblock '%s'", testblock_name) + groundtruth = None + groundtruth_epsilon = None + metrics.append(CheckLocalization(metric["root_frame"], metric["measured_frame"], metric["max_loc_error"], groundtruth, groundtruth_epsilon)) + return metrics + +class CheckLocalization: + def __init__(self, root_frame, measured_frame, max_loc_error, groundtruth, groundtruth_epsilon): + """ + Class for calculating the seconds passed with lost localization in respect to a given root frame. + The tf data is sent over the tf topic given in the robot_config.yaml. + :param root_frame: name of the first frame + :type root_frame: string + :param measured_frame: name of the second frame. The distance will be measured in relation to the root_frame. + :type measured_frame: string + :param max_loc_error: distance between the two frames which is considered to be localization lost + :type max_loc_error:float + """ + + self.active = False + self.root_frame = root_frame + self.measured_frame = measured_frame + self.distance = 0.0 + self.lost_count = 0 + self.tf_sampling_freq = 1.0 # Hz + self.groundtruth = groundtruth + self.groundtruth_epsilon = groundtruth_epsilon + self.max_loc_error = max_loc_error; + self.finished = False + + self.listener = tf.TransformListener() + + rospy.Timer(rospy.Duration.from_sec(1 / self.tf_sampling_freq), self.record_tf) + + def start(self, timestamp): + self.active = True + + def stop(self, timestamp): + self.active = False + self.finished = True + + def pause(self, timestamp): + self.active = False + self.first_value = True + + def purge(self, timestamp): + pass + + def record_tf(self, event): + if self.active: + try: + self.listener.waitForTransform(self.root_frame, + self.measured_frame, + rospy.Time(0), + rospy.Duration.from_sec(1 / (2*self.tf_sampling_freq))) + (trans, rot) = self.listener.lookupTransform(self.root_frame, self.measured_frame, rospy.Time(0)) + except (tf.Exception, tf.LookupException, tf.ConnectivityException): + #rospy.logwarn(e) + pass + else: + self.distance = math.sqrt(trans[0]**2 + trans[1]**2) + if (self.distance > self.max_loc_error): + self.lost_count += 1 + + + def get_result(self): + groundtruth_result = None + details = {"root_frame": self.root_frame, "measured_frame": self.measured_frame} + if self.finished: + data = self.lost_count + if self.groundtruth != None and self.groundtruth_epsilon != None: + if math.fabs(self.groundtruth - data) <= self.groundtruth_epsilon: + groundtruth_result = True + else: + groundtruth_result = False + return "localization", data, groundtruth_result, self.groundtruth, self.groundtruth_epsilon, details + else: + return False diff --git a/atf_recorder_plugins/config/recorder_plugins.yaml b/atf_recorder_plugins/config/recorder_plugins.yaml index e85989bc..dd2e1b7a 100644 --- a/atf_recorder_plugins/config/recorder_plugins.yaml +++ b/atf_recorder_plugins/config/recorder_plugins.yaml @@ -1,4 +1,7 @@ -#resources: RecordResources +resources_cpu: RecordResources +resource_mem: RecordResources +resource_io: RecordResources +resource_network: RecordResources interface: RecordInterface #obstacle_distance: RecordObstacleDistance # example: Example diff --git a/atf_recorder_plugins/package.xml b/atf_recorder_plugins/package.xml index 856b76d6..3206956c 100644 --- a/atf_recorder_plugins/package.xml +++ b/atf_recorder_plugins/package.xml @@ -16,4 +16,6 @@ atf_msgs rospy + python-psutil + diff --git a/atf_recorder_plugins/src/atf_recorder_plugins/__init__.py b/atf_recorder_plugins/src/atf_recorder_plugins/__init__.py index d54ff25b..8a6deeef 100644 --- a/atf_recorder_plugins/src/atf_recorder_plugins/__init__.py +++ b/atf_recorder_plugins/src/atf_recorder_plugins/__init__.py @@ -1,4 +1,4 @@ -#from atf_recorder_plugins.plugin_resources import RecordResources +from atf_recorder_plugins.plugin_resources import RecordResources from atf_recorder_plugins.plugin_interface import RecordInterface #from atf_recorder_plugins.plugin_obstacle_distance import RecordObstacleDistance # from atf_recorder_plugins.example import Example diff --git a/atf_recorder_plugins/src/atf_recorder_plugins/plugin_interface.py b/atf_recorder_plugins/src/atf_recorder_plugins/plugin_interface.py index 00cae280..46daffc1 100644 --- a/atf_recorder_plugins/src/atf_recorder_plugins/plugin_interface.py +++ b/atf_recorder_plugins/src/atf_recorder_plugins/plugin_interface.py @@ -9,7 +9,7 @@ class RecordInterface: - def __init__(self, write_lock, bag_file_writer): + def __init__(self, test_config, write_lock, bag_file_writer): self.bag_file_writer = bag_file_writer def trigger_callback(self, goal): diff --git a/atf_recorder_plugins/src/atf_recorder_plugins/plugin_resources.py b/atf_recorder_plugins/src/atf_recorder_plugins/plugin_resources.py index abfc64d8..fb847071 100755 --- a/atf_recorder_plugins/src/atf_recorder_plugins/plugin_resources.py +++ b/atf_recorder_plugins/src/atf_recorder_plugins/plugin_resources.py @@ -4,123 +4,111 @@ import time import xmlrpclib import rosnode +import yaml -from copy import copy +from copy import copy, deepcopy from re import findall from subprocess import check_output, CalledProcessError -from atf_msgs.msg import Resources, IO, Network -from atf_recorder import BagfileWriter - +from atf_msgs.msg import NodeResources, Resources, IO, Network, TestblockTrigger class RecordResources: - def __init__(self, topic_prefix, config_file, robot_config_file, write_lock, bag_file): - self.topic_prefix = topic_prefix - self.test_config = config_file + def __init__(self,test_config, write_lock, bag_file_writer): + self.topic_prefix = "atf/" + self.test_config = test_config - self.resources_timer_frequency = 4.0 # Hz + self.resources_timer_frequency = 10.0 # Hz self.timer_interval = 1/self.resources_timer_frequency self.testblock_list = self.create_testblock_list() self.pid_list = self.create_pid_list() - self.requested_nodes = {} + self.requested_nodes = [] self.res_pipeline = {} - self.BfW = BagfileWriter(bag_file, write_lock) + self.BfW = bag_file_writer rospy.Timer(rospy.Duration.from_sec(self.timer_interval), self.collect_resource_data) def update_requested_nodes(self, msg): + counter = 0 + requested_nodes = [] + if msg.trigger == TestblockTrigger.START: + print "START Trigger" - if msg.trigger.trigger == Trigger.ACTIVATE: - for node in self.testblock_list[msg.name]: - if node not in self.requested_nodes: - self.requested_nodes[node] = copy(self.testblock_list[msg.name][node]) - self.res_pipeline[node] = copy(self.testblock_list[msg.name][node]) - else: - for res in self.testblock_list[msg.name][node]: - self.requested_nodes[node].append(res) - if res not in self.res_pipeline[node]: - self.res_pipeline[node].append(res) - - elif msg.trigger.trigger == Trigger.FINISH: for node in self.testblock_list[msg.name]: - for res in self.testblock_list[msg.name][node]: - self.requested_nodes[node].remove(res) - if res not in self.requested_nodes[node]: - self.res_pipeline[node].remove(res) - if len(self.requested_nodes[node]) == 0: - del self.requested_nodes[node] - del self.res_pipeline[node] + print "node:", node + if not node in requested_nodes: + requested_nodes.append(node) + self.requested_nodes = deepcopy(requested_nodes) + counter += 1 - def create_testblock_list(self): + elif msg.trigger == TestblockTrigger.STOP: + print "STOP Trigger" + def create_testblock_list(self): testblock_list = {} - for testblock in self.test_config: + node_list = [] + counter = 0 + print "testconfig: ", self.test_config["test_config"] + for testblock, tests in self.test_config["test_config"].iteritems(): try: - self.test_config[testblock]["resources"] + tests + print "testblock tests: ", tests + except KeyError: + rospy.logerr("No nodes for resources to record") continue else: - for resource in self.test_config[testblock]["resources"]: - try: - testblock_list[testblock] - except KeyError: - testblock_list.update({testblock: {}}) - - for node_name in self.test_config[testblock]["resources"][resource]: - - if node_name not in testblock_list[testblock]: - testblock_list[testblock].update({node_name: [resource]}) - elif resource not in testblock_list[testblock][node_name]: - testblock_list[testblock][node_name].append(resource) - + for resource, nodes in tests.iteritems(): + if str(resource).__contains__("resource"): + print "nodes", nodes + node_list.extend(nodes[0]["nodes"]) + counter += 1 + try: + testblock_list[testblock] + except KeyError: + testblock_list.update({testblock: []}) + print "node list:", node_list + testblock_list.update({testblock: node_list}) + print "testblock list: ", testblock_list return testblock_list def collect_resource_data(self, event): - pipeline = copy(self.res_pipeline) - if not len(pipeline) == 0: - msg = Resources() - topic = self.topic_prefix + "resources" - - for node in pipeline: - msg_data = NodeResources() - pid = self.pid_list[node] - - if pid is None: - continue - - try: - msg_data.node_name = node - - if "cpu" in pipeline[node]: - msg_data.cpu = psutil.Process(pid).cpu_percent(interval=self.timer_interval) - - if "mem" in pipeline[node]: - msg_data.memory = psutil.Process(pid).memory_percent() - - if "io" in pipeline[node]: - data = findall('\d+', str(psutil.Process(pid).io_counters())) - msg_data.io.read_count = int(data[0]) - msg_data.io.write_count = int(data[1]) - msg_data.io.read_bytes = int(data[2]) - msg_data.io.write_bytes = int(data[3]) - - if "network" in pipeline[node]: - data = findall('\d+', str(psutil.net_io_counters())) - msg_data.network.bytes_sent = int(data[0]) - msg_data.network.bytes_recv = int(data[1]) - msg_data.network.packets_sent = int(data[2]) - msg_data.network.packets_recv = int(data[3]) - msg_data.network.errin = int(data[4]) - msg_data.network.errout = int(data[5]) - msg_data.network.dropin = int(data[6]) - msg_data.network.dropout = int(data[7]) - - msg.nodes.append(msg_data) - except (psutil.NoSuchProcess, psutil.AccessDenied): - pass - - self.BfW.write_to_bagfile(topic, msg, rospy.Time.from_sec(time.time())) + msg = Resources() + msg_list = [] + topic = self.topic_prefix + "resources" + msg_data = NodeResources() + for node, pid in self.pid_list.iteritems(): + if pid is None: + continue + try: + msg_data.node_name = node + + msg_data.cpu = psutil.Process(pid).get_cpu_percent(interval=self.timer_interval) + + msg_data.memory = psutil.Process(pid).get_memory_percent() + + data = findall('\d+', str(psutil.Process(pid).get_io_counters())) + msg_data.io.read_count = int(data[0]) + msg_data.io.write_count = int(data[1]) + msg_data.io.read_bytes = int(data[2]) + msg_data.io.write_bytes = int(data[3]) + + data = findall('\d+', str(psutil.net_io_counters())) + msg_data.network.bytes_sent = int(data[0]) + msg_data.network.bytes_recv = int(data[1]) + msg_data.network.packets_sent = int(data[2]) + msg_data.network.packets_recv = int(data[3]) + msg_data.network.errin = int(data[4]) + msg_data.network.errout = int(data[5]) + msg_data.network.dropin = int(data[6]) + msg_data.network.dropout = int(data[7]) + + msg_list.append(copy(msg_data)) + except (psutil.NoSuchProcess, psutil.AccessDenied) as e: + rospy.logerr("collecting error: %s", e) + pass + msg.nodes = msg_list + self.BfW.write_to_bagfile(topic, msg, rospy.Time.now()) def trigger_callback(self, msg): @@ -128,17 +116,16 @@ def trigger_callback(self, msg): if msg.name in self.testblock_list: self.update_requested_nodes(msg) - if msg.trigger.trigger == Trigger.ERROR: - self.res_pipeline = [] def create_pid_list(self): node_list = {} + pid_list = {} for (testblock, nodes) in self.testblock_list.iteritems(): for node in nodes: - if node not in node_list: - node_list[node] = self.get_pid(node) - - return node_list + if self.get_pid(node) not in pid_list: + pid_list.update({node:self.get_pid(node)}) + print "pid", self.get_pid(node), "for node", node + return pid_list @staticmethod def get_pid(name):