|
9 | 9 | # or in the "LICENSE.txt" file accompanying this file. |
10 | 10 | # This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express or implied. |
11 | 11 | # See the License for the specific language governing permissions and limitations under the License. |
| 12 | +import ast |
12 | 13 | import datetime |
13 | 14 | import json |
14 | 15 | import os |
15 | 16 | import time |
| 17 | +from collections import defaultdict |
16 | 18 | from typing import List |
17 | 19 |
|
| 20 | +import boto3 |
| 21 | +import matplotlib.pyplot as plt |
| 22 | +import pandas as pd |
18 | 23 | import untangle |
19 | 24 | from framework.metrics_publisher import Metric, MetricsPublisher |
20 | 25 | from junitparser import JUnitXml |
21 | 26 |
|
| 27 | +from pcluster.constants import SUPPORTED_OSES |
| 28 | + |
| 29 | +SECONDS_PER_YEAR = 365 * 24 * 60 * 60 |
| 30 | + |
22 | 31 |
|
23 | 32 | def generate_cw_report(test_results_dir, namespace, aws_region, timestamp_day_start=False, start_timestamp=None): |
24 | 33 | """ |
@@ -143,3 +152,288 @@ def _put_metrics( |
143 | 152 | [Metric(item["name"], item["value"], item["unit"], dimensions, timestamp)], |
144 | 153 | ) |
145 | 154 | time.sleep(put_metric_sleep_interval) |
| 155 | + |
| 156 | + |
| 157 | +def _scan_dynamodb_table( |
| 158 | + table_name, projection_expression, expression_attribute_names, filter_expression, expression_attribute_values |
| 159 | +): |
| 160 | + """Scan a DynamoDB table with pagination and return all items.""" |
| 161 | + dynamodb_client = boto3.client("dynamodb", region_name="us-east-1") |
| 162 | + all_items = [] |
| 163 | + last_evaluated_key = None |
| 164 | + while True: |
| 165 | + scan_params = { |
| 166 | + "TableName": table_name, |
| 167 | + "ProjectionExpression": projection_expression, |
| 168 | + "FilterExpression": filter_expression, |
| 169 | + "ExpressionAttributeNames": expression_attribute_names, |
| 170 | + "ExpressionAttributeValues": expression_attribute_values, |
| 171 | + } |
| 172 | + |
| 173 | + # Add ExclusiveStartKey if we're not on the first iteration |
| 174 | + if last_evaluated_key: |
| 175 | + scan_params["ExclusiveStartKey"] = last_evaluated_key |
| 176 | + |
| 177 | + response = dynamodb_client.scan(**scan_params) |
| 178 | + all_items.extend(response.get("Items", [])) |
| 179 | + |
| 180 | + # Check if there are more items to fetch |
| 181 | + last_evaluated_key = response.get("LastEvaluatedKey") |
| 182 | + if not last_evaluated_key: |
| 183 | + break |
| 184 | + return all_items |
| 185 | + |
| 186 | + |
| 187 | +def generate_launch_time_report(reports_output_dir): |
| 188 | + current_time = int(time.time()) |
| 189 | + one_year_ago = current_time - SECONDS_PER_YEAR |
| 190 | + |
| 191 | + all_items = _scan_dynamodb_table( |
| 192 | + table_name="ParallelCluster-IntegTest-Metadata", |
| 193 | + projection_expression="#status, #avg_launch, #max_launch, #min_launch, #creation_time, #name, #os, #start_time", |
| 194 | + expression_attribute_names={ |
| 195 | + "#call_start_time": "call_start_time", |
| 196 | + "#status": "call_status", |
| 197 | + "#avg_launch": "compute_average_launch_time", |
| 198 | + "#max_launch": "compute_max_launch_time", |
| 199 | + "#min_launch": "compute_min_launch_time", |
| 200 | + "#creation_time": "cluster_creation_time", |
| 201 | + "#name": "name", |
| 202 | + "#os": "os", |
| 203 | + "#start_time": "call_start_time", |
| 204 | + }, |
| 205 | + filter_expression="#call_start_time >= :one_year_ago", |
| 206 | + expression_attribute_values={":one_year_ago": {"N": str(one_year_ago)}}, |
| 207 | + ) |
| 208 | + all_items.sort(key=lambda x: x["call_start_time"]["N"], reverse=True) |
| 209 | + for category_name in ["os", "name"]: |
| 210 | + # Analyze the data by os and by name. |
| 211 | + # Therefore, we can see which os is taking long, or which test is taking long. |
| 212 | + category_name_processing = None |
| 213 | + if category_name == "name": |
| 214 | + category_name_processing = _remove_os_from_string |
| 215 | + for statistics_name in [ |
| 216 | + "cluster_creation_time", |
| 217 | + "compute_average_launch_time", |
| 218 | + "compute_min_launch_time", |
| 219 | + "compute_max_launch_time", |
| 220 | + ]: |
| 221 | + if statistics_name in ["cluster_creation_time", "compute_average_launch_time"]: |
| 222 | + statistics_processing = _mean |
| 223 | + elif statistics_name in ["compute_max_launch_time"]: |
| 224 | + statistics_processing = max |
| 225 | + else: |
| 226 | + statistics_processing = min |
| 227 | + result = _get_statistics_by_category( |
| 228 | + all_items, |
| 229 | + category_name, |
| 230 | + statistics_name, |
| 231 | + category_name_processing=category_name_processing, |
| 232 | + statistics_processing=statistics_processing, |
| 233 | + ) |
| 234 | + create_report(result, [statistics_name, category_name], reports_output_dir) |
| 235 | + |
| 236 | + |
| 237 | +def generate_performance_report(reports_output_dir): |
| 238 | + current_time = int(time.time()) |
| 239 | + one_year_ago = current_time - SECONDS_PER_YEAR |
| 240 | + |
| 241 | + all_items = _scan_dynamodb_table( |
| 242 | + table_name="ParallelCluster-PerformanceTest-Metadata", |
| 243 | + projection_expression="#instance, #name, #os, #result, #timestamp, #mpi_variation, #num_instances", |
| 244 | + expression_attribute_names={ |
| 245 | + "#instance": "instance", |
| 246 | + "#name": "name", |
| 247 | + "#os": "os", |
| 248 | + "#result": "result", |
| 249 | + "#timestamp": "timestamp", |
| 250 | + "#mpi_variation": "mpi_variation", |
| 251 | + "#num_instances": "num_instances", |
| 252 | + }, |
| 253 | + filter_expression="#timestamp >= :one_year_ago", |
| 254 | + expression_attribute_values={":one_year_ago": {"N": str(one_year_ago)}}, |
| 255 | + ) |
| 256 | + all_items.sort(key=lambda x: x["timestamp"]["N"], reverse=True) |
| 257 | + items_by_name = defaultdict(list) |
| 258 | + category_names = [("name", "S"), ("mpi_variation", "S"), ("num_instances", "N")] |
| 259 | + for item in all_items: |
| 260 | + keys = [] |
| 261 | + for category_name, value_type in category_names: |
| 262 | + keys += [item.get(category_name, {}).get(value_type, "")] |
| 263 | + key = "_".join(keys) |
| 264 | + items_by_name[key].append(item) |
| 265 | + result = defaultdict(dict) |
| 266 | + for name, items in items_by_name.items(): |
| 267 | + result[name] = _get_statistics_from_result(items, name, reports_output_dir) |
| 268 | + |
| 269 | + |
| 270 | +def _append_performance_data(target_dict, os_key, performance, timestamp): |
| 271 | + os_time_key = f"{os_key}-time" |
| 272 | + if os_key not in target_dict: |
| 273 | + target_dict[os_key] = [] |
| 274 | + target_dict[os_time_key] = [] |
| 275 | + target_dict[os_key].append(float(performance)) |
| 276 | + target_dict[os_time_key].append(datetime.datetime.fromtimestamp(int(timestamp)).strftime("%Y-%m-%d %H:%M:%S")) |
| 277 | + |
| 278 | + |
| 279 | +def _get_statistics_from_result(all_items, name, reports_output_dir): |
| 280 | + result = {} |
| 281 | + result_with_single_layer = {} |
| 282 | + for item in all_items: |
| 283 | + this_result = ast.literal_eval(item["result"]["S"]) |
| 284 | + # The result can contain different format because we historical development of the test data collection. |
| 285 | + # 1. The result can be empty or empty list because testing didn't successfully push data in the initial coding. |
| 286 | + # 2. The result can be a list of tuples or a dictionary. For example, the following result shows OSU latency |
| 287 | + # number of corresponding to different packet sizes |
| 288 | + # [('0', '16'), ('1', '16'), ('2', '16'), ... , ('4194304', '1368')] |
| 289 | + # 3. The result can be a single number. |
| 290 | + # For example, OSU barrier test returns a single number without any consideration of packet sizes. |
| 291 | + # The following logic handles all the differences |
| 292 | + if not this_result: |
| 293 | + continue |
| 294 | + os_key = item["os"]["S"] |
| 295 | + timestamp = item["timestamp"]["N"] |
| 296 | + if isinstance(this_result, (dict, list)): |
| 297 | + for key, performance in this_result: |
| 298 | + if key not in result: |
| 299 | + result[key] = {} |
| 300 | + _append_performance_data(result[key], os_key, performance, timestamp) |
| 301 | + else: |
| 302 | + _append_performance_data(result_with_single_layer, os_key, this_result, timestamp) |
| 303 | + for key, node_num_result in result.items(): |
| 304 | + create_report(node_num_result, [name, key], reports_output_dir) |
| 305 | + if result_with_single_layer: |
| 306 | + create_report(result_with_single_layer, [name], reports_output_dir) |
| 307 | + return result |
| 308 | + |
| 309 | + |
| 310 | +def _mean(x): |
| 311 | + return sum(x) / len(x) |
| 312 | + |
| 313 | + |
| 314 | +def _remove_os_from_string(x): |
| 315 | + for os_key in SUPPORTED_OSES: |
| 316 | + x = x.replace(os_key, "") |
| 317 | + return x |
| 318 | + |
| 319 | + |
| 320 | +def _get_statistics_by_category( # noqa C901 |
| 321 | + all_items, category_name, statistics_name, category_name_processing=None, statistics_processing=None |
| 322 | +): |
| 323 | + # This function is used to get "cluster_creation_time", "compute_average_launch_time", |
| 324 | + # "compute_min_launch_time", and "compute_max_launch_time", |
| 325 | + # This function uses a window of the number of operating systems, |
| 326 | + # so that the statistics are more stable when os rotation is in place. |
| 327 | + more_data = True |
| 328 | + latest_time = float(all_items[0]["call_start_time"]["N"]) |
| 329 | + window_length = len(SUPPORTED_OSES) |
| 330 | + result = {} |
| 331 | + while more_data: |
| 332 | + more_data = False |
| 333 | + os_cluster_creation_times = {} |
| 334 | + for item in all_items: |
| 335 | + if item["call_status"]["S"] != "passed": |
| 336 | + continue |
| 337 | + if statistics_name not in item: |
| 338 | + continue |
| 339 | + if float(item["call_start_time"]["N"]) < latest_time - (window_length * 24 * 60 * 60): |
| 340 | + more_data = True |
| 341 | + continue |
| 342 | + if float(item["call_start_time"]["N"]) > latest_time: |
| 343 | + continue |
| 344 | + cluster_creation_time = item[statistics_name]["N"] |
| 345 | + if cluster_creation_time == "0": |
| 346 | + continue |
| 347 | + os_key = item[category_name]["S"] |
| 348 | + if category_name_processing: |
| 349 | + os_key = category_name_processing(os_key) |
| 350 | + if os_key not in os_cluster_creation_times: |
| 351 | + os_cluster_creation_times[os_key] = [float(cluster_creation_time)] |
| 352 | + else: |
| 353 | + os_cluster_creation_times[os_key].append(float(cluster_creation_time)) |
| 354 | + for os_key, cluster_creation_times in os_cluster_creation_times.items(): |
| 355 | + if os_key not in result: |
| 356 | + result[os_key] = [] |
| 357 | + os_time_key = f"{os_key}-time" |
| 358 | + if os_time_key not in result: |
| 359 | + result[os_time_key] = [] |
| 360 | + result[os_key].insert(0, sum(cluster_creation_times) / len(cluster_creation_times)) |
| 361 | + result[os_time_key].insert(0, datetime.datetime.fromtimestamp(latest_time).strftime("%Y-%m-%d")) |
| 362 | + if os_cluster_creation_times: |
| 363 | + more_data = True |
| 364 | + latest_time = latest_time - 24 * 60 * 60 |
| 365 | + |
| 366 | + return result |
| 367 | + |
| 368 | + |
| 369 | +def plot_statistics(result, name_prefix): |
| 370 | + plt.figure(figsize=(40, 12)) |
| 371 | + |
| 372 | + # Collect and sort all unique time points |
| 373 | + all_times = set() |
| 374 | + for category, values in result.items(): |
| 375 | + if "-time" in category: |
| 376 | + all_times.update(values) |
| 377 | + sorted_times = sorted(all_times) |
| 378 | + time_to_index = {time: i for i, time in enumerate(sorted_times)} |
| 379 | + |
| 380 | + # Plot each category using numeric x positions |
| 381 | + for category, values in result.items(): |
| 382 | + if "-time" in category: |
| 383 | + continue |
| 384 | + x_values = result[f"{category}-time"] |
| 385 | + x_positions = [time_to_index[time] for time in x_values] |
| 386 | + plt.plot(x_positions, values, marker="o", label=category) |
| 387 | + |
| 388 | + plt.title(f"{name_prefix}") |
| 389 | + plt.xlabel("Latest timestamp") |
| 390 | + plt.ylabel("Value") |
| 391 | + plt.grid(True, linestyle="--", alpha=0.7) |
| 392 | + plt.legend() |
| 393 | + plt.xticks(range(len(sorted_times)), sorted_times, rotation=45) |
| 394 | + plt.tight_layout() |
| 395 | + plt.show() |
| 396 | + |
| 397 | + |
| 398 | +def create_excel_files(result, name_prefix, reports_output_dir): |
| 399 | + # Collect and sort all unique time points |
| 400 | + filename = os.path.join(reports_output_dir, f"{name_prefix}_statistics.xlsx") |
| 401 | + print(f"Creating Excel file: {filename}...") |
| 402 | + all_times = set() |
| 403 | + for category, values in result.items(): |
| 404 | + if "-time" in category: |
| 405 | + all_times.update(values) |
| 406 | + sorted_times = sorted(all_times) |
| 407 | + |
| 408 | + df_data = {} |
| 409 | + |
| 410 | + # Add each category as a column |
| 411 | + for category, values in result.items(): |
| 412 | + if "-time" in category: |
| 413 | + continue |
| 414 | + x_values = result[f"{category}-time"] |
| 415 | + # Create series and aggregate duplicates by taking the mean |
| 416 | + category_series = pd.Series(index=x_values, data=values).groupby(level=0).mean() |
| 417 | + df_data[category] = category_series.reindex(sorted_times) |
| 418 | + |
| 419 | + df = pd.DataFrame(df_data) |
| 420 | + |
| 421 | + # Write to Excel |
| 422 | + with pd.ExcelWriter(filename, engine="openpyxl") as writer: |
| 423 | + df.T.to_excel(writer, index=True) |
| 424 | + |
| 425 | + print(f"Excel file saved: {filename}") |
| 426 | + |
| 427 | + |
| 428 | +def _get_launch_time(logs, instance_id): |
| 429 | + for log in logs: |
| 430 | + if instance_id in log["message"]: |
| 431 | + return log["timestamp"] |
| 432 | + |
| 433 | + |
| 434 | +def create_report(result, labels, reports_output_dir, create_graphs=False, create_excel=True): |
| 435 | + name_prefix = "_".join(map(str, labels)) |
| 436 | + if create_excel: |
| 437 | + create_excel_files(result, name_prefix, reports_output_dir) |
| 438 | + if create_graphs: |
| 439 | + plot_statistics(result, name_prefix) |
0 commit comments