diff --git a/src/autoval_ssd/lib/utils/storage/nvme/nvme_drive.py b/src/autoval_ssd/lib/utils/storage/nvme/nvme_drive.py index 55765af..fbb058d 100644 --- a/src/autoval_ssd/lib/utils/storage/nvme/nvme_drive.py +++ b/src/autoval_ssd/lib/utils/storage/nvme/nvme_drive.py @@ -2,13 +2,14 @@ # pyre-unsafe """library to manage nvme drive""" + import json import os import re import time from enum import Enum from time import sleep -from typing import Dict, List +from typing import Optional, Tuple from autoval.lib.host.component.component import COMPONENT @@ -75,7 +76,13 @@ class NVMeDrive(Drive): ] NVMECLI_MANUFACTURER = None - def __init__(self, host, block_name, config=None) -> None: + def __init__( + self, + host, + block_name, + config=None, + nvme_list_info: Optional[list[dict[str, str]]] = None, + ) -> None: """ Class for storing data and interacting with NVME drives @@ -89,6 +96,7 @@ def __init__(self, host, block_name, config=None) -> None: - cfg/nvme_smart_fdi """ super().__init__(host, block_name, config=config) + self.nvme_list_info = nvme_list_info if config is None: config = DEFAULT_VALIDATE_CONFIG self.interface = DriveInterface.NVME @@ -115,8 +123,8 @@ def __init__(self, host, block_name, config=None) -> None: self.fw_ver = None self.current_fw_ver = None self.fw_ns_slots_models_map = {} - self.ocp_2_6_drives: List = [] - self.workload_target_drives: List = [] + self.ocp_2_6_drives: list = [] + self.workload_target_drives: list = [] self.lmparser_ocp_2_0_drives = {} self.cfg_dir = "" @@ -128,7 +136,7 @@ def get_smart_log_keys(self) -> None: smart_log = self.get_smart_log() self.smart_log_keys = self._flatten_validate_config_dict(smart_log).keys() - def load_config(self, config_file: str) -> Dict: + def load_config(self, config_file: str) -> dict: """ @param config_file @return config for smart validation @@ -157,10 +165,10 @@ def get_target_path() -> str: target_path = "" current_file_path = os.path.abspath(__file__) try: - pattern = r"^(/.*?)/autoval_ssd/" + pattern = r"^(/.*?)/autoval_ssd/lib" match = re.search(pattern, current_file_path) if match: - target_path = match.group(0)[:-1] + target_path = match.group(0).replace("/lib", "") except Exception: raise AutovalFileNotFound("The required file path is not found") return target_path @@ -211,7 +219,7 @@ def get_arbitration_mechanism_status(self): """ Method to get the controller properties """ - nvme_drive = "/dev/%s" % self.block_name + nvme_drive = "/dev/%s" % re.sub(r"n\d+$", "", self.block_name) cmd = "nvme show-regs %s -H" % nvme_drive out = AutovalUtils.validate_no_exception( self.host.run, @@ -439,30 +447,35 @@ def get_feature(self, feature_id=None, queue_id=None): nvme_drive = "/dev/%s" % self.block_name features_info = [] for _id in feature_ids: - if queue_id: - cmd = f"nvme get-feature {nvme_drive} -f {_id} -H {queue_id}" - else: - cmd = f"nvme get-feature {nvme_drive} -f {_id} -H" - out = self.host.run_get_result(cmd=cmd).stdout # noqa + cmd = f"nvme get-feature {nvme_drive} -f {_id} -H" + out = self.host.run_get_result(cmd=cmd).stdout feature_info = ",".join([s.strip() for s in out.splitlines()]) features_info.append(feature_info) return features_info def get_capacity(self, unit: str = "byte"): """Return drive capacity""" - _byte = NVMeUtils.get_from_nvme_list(self.host, self.block_name, "PhysicalSize") + _byte = NVMeUtils.get_from_nvme_list( + self.host, self.block_name, "PhysicalSize", self.nvme_list_info + ) return DiskUtils.convert_from_bytes(_byte, unit) def get_serial_number(self): """Return drive serial_number""" - return NVMeUtils.get_from_nvme_list(self.host, self.block_name, "SerialNumber") + return NVMeUtils.get_from_nvme_list( + self.host, self.block_name, "SerialNumber", self.nvme_list_info + ) def _get_model(self): - return NVMeUtils.get_from_nvme_list(self.host, self.block_name, "ModelNumber") + return NVMeUtils.get_from_nvme_list( + self.host, self.block_name, "ModelNumber", self.nvme_list_info + ) def get_firmware_version(self): """Return drive FW version""" - return NVMeUtils.get_from_nvme_list(self.host, self.block_name, "Firmware") + return NVMeUtils.get_from_nvme_list( + self.host, self.block_name, "Firmware", self.nvme_list_info + ) def get_manufacturer(self) -> str: """Return drive manufacturer""" @@ -485,7 +498,7 @@ def get_smart_log(self): smart_log.update(self.get_ocp_smart_log()) return smart_log - def get_ocp_smart_log(self) -> Dict: + def get_ocp_smart_log(self) -> dict: """ Collect OCP smart log and return it. """ @@ -527,23 +540,40 @@ def get_ocp_telemetry_string_log(self) -> None: cmd = f"nvme ocp telemetry-string-log /dev/{self.block_name}" self.host.run(cmd=cmd, ignore_status=True, working_directory=dut_logdir) - def get_internal_log(self) -> bool: - """ - Return drive telemetry log. + def get_internal_log(self, timeout: int, phase: str = "", flag: str = "") -> bool: + """Return drive telemetry log. + Args: - None + ---- + timeout: The timeout value for nvme telemetry-log command. + phase: The phase of the telemetry log, used to differentiate log files before and after test. + flag: Whether to append the -d 1 flag to the command and default is False. + Returns: - The completion status of internal log file generation. + ------- + bool: The completion status of internal log file generation. """ dut_logdir = SiteUtils.get_dut_logdir(self.host.hostname) - cmd = f"nvme telemetry-log --output-file=bin /dev/{self.block_name}" - ret = self.host.run_get_result( - cmd=cmd, ignore_status=True, working_directory=dut_logdir + cmd = f"timeout {timeout}s nvme telemetry-log --output-file={self.serial_number}__telemetrylog_{phase}.bin /dev/{self.block_name}{flag}" + working_directory = ( + f"{dut_logdir}/telemetry_{phase}" if phase and not flag else dut_logdir ) - if ret.return_code != 0: - AutovalLog.log_info(f"WARNING: command '{cmd}' failed with error code") + try: + ret = self.host.run_get_result( + cmd=cmd, + ignore_status=True, + working_directory=working_directory, + timeout=timeout, + ) + if ret.return_code != 0: + AutovalLog.log_info(f"WARNING: command '{cmd}' failed with error code") + return False + return True + except TimeoutError: + AutovalLog.log_info( + f"WARNING: Failed to complete '{cmd}' within {timeout} seconds on {self.host.hostname}" + ) return False - return True def get_effects_log(self): """Gets Effects Log. @@ -560,9 +590,13 @@ def get_effects_log(self): TestStepError When fails to retrieve the command effects log. """ - cmd = "nvme effects-log /dev/%s -o json" % self.block_name + cmd = "nvme effects-log /dev/%s -o json" % re.sub(r"n\d+$", "", self.block_name) out = self.host.run(cmd=cmd) - return json.loads(out) + try: + out = json.loads(out) + except json.decoder.JSONDecodeError: + out = NVMeUtils.parse_json_string(out) + return out def get_id_ctrl(self): """Return id_ctrl""" @@ -588,7 +622,7 @@ def supports_flash_temp_check(self) -> bool: # can be provided in the vendor subclass return True - def get_nand_write_param(self) -> Dict[str, str]: + def get_nand_write_param(self) -> dict[str, str]: """Return nand_write params""" # Can be provided in vendor subclass return {} @@ -604,7 +638,7 @@ def get_vs_nand_stat_log(self) -> None: return def get_write_amplification( - self, smart_before: Dict[str, Dict], smart_after: Dict[str, Dict] + self, smart_before: dict[str, dict], smart_after: dict[str, dict] ) -> bool: """ Method to calculate the Flash Write Amplification @@ -644,7 +678,7 @@ def get_write_amplification( ) if waf: AutovalLog.log_info( - "Lifetime WAF for drive %s is %s" % (self.block_name, waf) + f"Lifetime WAF for drive {self.block_name} is {waf}" ) write_amplification["lifetime_write_amplification"] = waf waf = { @@ -671,15 +705,15 @@ def get_write_amplification( # Calculate Write amp for the currently running test write_amplification["test_write_amplification"] = 0 waf, error = self.calculate_waf(host_delta, nand_delta, nand_write_formula) - AutovalLog.log_info( - "WAF during this test for drive %s: %s" % (self.block_name, waf) - ) + AutovalLog.log_info(f"WAF during this test for drive {self.block_name}: {waf}") write_amplification["test_write_amplification"] = waf if error: AutovalLog.log_info( - "Cannot calculate WAF for drive %s due to %s" % (self.block_name, error) + "Cannot calculate WAF for drive {} due to {}".format( + self.block_name, error + ) ) - AutovalLog.log_info("Drive %s: %s" % (self.block_name, write_amplification)) + AutovalLog.log_info(f"Drive {self.block_name}: {write_amplification}") return True def calculate_waf(self, h_write, n_write, nand_write_formula): @@ -726,7 +760,7 @@ def convert_nand_write(self, nand_write) -> float: nand_write = float(nand_write) except Exception as exc: raise TestError( - "Failed convert %s to float: %s" % (nand_write, exc), + f"Failed convert {nand_write} to float: {exc}", component=COMPONENT.STORAGE_DRIVE, error_type=ErrorType.TOOL_ERR, ) @@ -946,7 +980,7 @@ def fw_activate( self, drive_name: str, file_name: str, - fw_slot: List[int], + fw_slot: list[int], action: int, nvme_admin_io=True, ) -> None: @@ -1161,7 +1195,11 @@ def drive_health_check(self) -> None: % self.block_name ) smart_log = self.get_smart_log() - critical_warning = smart_log["smart-log"]["critical_warning"] + critical_warning_type = smart_log["smart-log"]["critical_warning"] + if isinstance(critical_warning_type, dict): + critical_warning = critical_warning_type["value"] + else: + critical_warning = critical_warning_type if critical_warning > 0: raise TestError( f"The {self.manufacturer} drive {self.serial_number}" @@ -1194,7 +1232,7 @@ def drive_erase_count(self) -> None: except KeyError: pass - def get_fw_update_ns_actions(self) -> List[int]: + def get_fw_update_ns_actions(self) -> list[int]: """Get Firmware update non supported actions. This method will return the list of actions which are not supported on the model. @@ -1233,7 +1271,7 @@ def is_drive_degraded(self) -> None: error_type=ErrorType.DRIVE_ERR, ) - def get_fw_slots(self) -> List[int]: + def get_fw_slots(self) -> list[int]: """Get available FW slots""" nvme_drive = "/dev/%s" % self.block_name out = self.get_nvme_id_ctrl(human_readable=True) @@ -1536,3 +1574,57 @@ def is_lmparser_ocp_2_0_drive(self) -> bool: if self.model in self.lmparser_ocp_2_0_drives[fw_ver]: return True return False + + def get_ocp_hardware_component_log(self, nvme_version: str) -> None: + """ + Get OCP hardware component log + + Args: + nvme_version: Current installed NVMe version + """ + if not NVMeUtils.compare_versions("2.10.0", nvme_version): + AutovalLog.log_info( + f"Current NVMe version '{nvme_version}' does not support OCP hardware component log" + ) + return + DSSD_version = self.get_DSSD_version() + if DSSD_version is None: + AutovalLog.log_info( + f"Skipping OCP hardware component log collection. DSSD version is not available for {self.block_name}" + ) + return + Major_version, Minor_version = DSSD_version + if Major_version < 2 or (Major_version == 2 and Minor_version < 5): + AutovalLog.log_info( + f"Skipping OCP hardware component log collection. DSSD version is less than 2.5 for {self.block_name}" + ) + return + + dut_logdir = SiteUtils.get_dut_logdir(self.host.hostname) + cmd = f"nvme ocp hardware-component-log /dev/{self.block_name}" + out = self.host.run_get_result( + cmd=cmd, ignore_status=True, working_directory=dut_logdir + ) + AutovalUtils.validate_equal( + out.return_code, + 0, + f"Collected OCP hardware component log for {self.block_name}", + component=COMPONENT.STORAGE_DRIVE, + error_type=ErrorType.DRIVE_ERR, + ) + + def get_DSSD_version(self) -> Optional[Tuple[int, int]]: + """ + Get DSSD version + + Returns: + (Major, Minor) DSSD version + """ + cmd = f'nvme ocp smart-add-log /dev/{self.block_name} -o json | grep -E "(Major|Minor) Version Field"' + out = self.host.run_get_result(cmd=cmd, ignore_status=True) + + if out.return_code == 0: + major = re.search(r'"Major Version Field"\s*:\s*(\d+)', out.stdout) + minor = re.search(r'"Minor Version Field"\s*:\s*(\d+)', out.stdout) + if major and minor: + return (int(major.group(1)), int(minor.group(1))) diff --git a/src/autoval_ssd/lib/utils/storage/nvme/nvme_utils.py b/src/autoval_ssd/lib/utils/storage/nvme/nvme_utils.py index fab463d..406951f 100644 --- a/src/autoval_ssd/lib/utils/storage/nvme/nvme_utils.py +++ b/src/autoval_ssd/lib/utils/storage/nvme/nvme_utils.py @@ -2,21 +2,22 @@ # pyre-unsafe """utils for manage NMVE drive""" + import json import re import time from enum import auto, Enum -from typing import Dict, Optional, TYPE_CHECKING +from typing import Any, Dict, Optional, Union + +from autoval.lib.host.component.component import COMPONENT +from autoval.lib.host.host import Host +from autoval.lib.utils.autoval_errors import ErrorType from autoval.lib.utils.autoval_exceptions import TestError from autoval.lib.utils.autoval_log import AutovalLog from autoval.lib.utils.autoval_utils import AutovalUtils -if TYPE_CHECKING: - from autoval.lib.host.host import Host - - class NVMeDeviceEnum(Enum): """Class for NVME drives enumeration""" @@ -26,6 +27,14 @@ class NVMeDeviceEnum(Enum): INVALID = auto() +class NVMeGetFeatureSelect(Enum): + """-s flag options for the nvme get-feature command""" + + CURRENT = 0 + DEFAULT = 1 + SAVED = 2 + + class NVMeUtils: """Class for NVME drives""" @@ -52,7 +61,7 @@ def get_nvme_device_type(device_name: str) -> NVMeDeviceEnum: return NVMeDeviceEnum.INVALID @staticmethod - def get_id_ctrl(host, device_name) -> Dict: + def get_id_ctrl(host, device_name) -> dict: """ @param Host : host @param String block_name: e.g. nvme1n1 or char_name: eg nvme1 @@ -81,7 +90,7 @@ def get_id_ctrl_normal_data(host, device_name: str) -> str: return out @staticmethod - def get_id_ns(host, device_name: str, nsid: Optional[int] = None) -> Dict: + def get_id_ns(host, device_name: str, nsid: Optional[int] = None) -> dict: """ Return identify namespace json output. @param Host : host @@ -141,26 +150,41 @@ def get_nvme_list(host): ] """ ret = host.run_get_result("nvme list -o json") - nvme_list = json.loads(ret.stdout) - return nvme_list["Devices"] + try: + output = json.loads(ret.stdout) + except json.JSONDecodeError: + output = NVMeUtils.parse_json_string(ret.stdout) + nvme_list = output["Devices"] + entry_list = [] + for dr in nvme_list: + if "DevicePath" not in dr: + for output in dr.get("Subsystems", []): + entry_list.extend(NVMeUtils.convert_nvme_output(output)) + + nvme_list = entry_list or nvme_list + return nvme_list @staticmethod - def get_from_nvme_list(host, block_name, field): + def get_from_nvme_list(host, block_name, field, nvme_list_info=None): """ @param String block_name: e.g. nvme1n1 @param String field: field to update + @param nvme_list_info: list of dictionaries containing information about nvme drives @return String: value of given field """ - nvme_list = NVMeUtils.get_nvme_list(host) + + if nvme_list_info is None: + nvme_list_info = NVMeUtils.get_nvme_list(host) + path = "/dev/%s" % block_name try: - drive_data = [dr for dr in nvme_list if dr["DevicePath"] == path].pop() + drive_data = [dr for dr in nvme_list_info if dr["DevicePath"] == path].pop() except IndexError: raise TestError( - f"Unable to find DevicePath for {block_name} in {nvme_list}" + "Unable to find DevicePath for %s in %s" % (block_name, nvme_list_info) ) if field not in drive_data: - raise TestError(f"Unable to find {field} in {drive_data}") + raise TestError("Unable to find %s in %s" % (field, drive_data)) if isinstance(drive_data[field], str): return drive_data[field].strip() return drive_data[field] @@ -411,6 +435,8 @@ def is_read_only(host, drive: str) -> bool: ret = host.run_get_result(cmd) out_json = AutovalUtils.loads_json(ret.stdout) if "critical_warning" in out_json: + if isinstance(out_json["critical_warning"], dict): + out_json["critical_warning"] = out_json["critical_warning"]["value"] if (out_json["critical_warning"]) & (1 << 3): return True return False @@ -592,6 +618,95 @@ def compare_versions(expected_version: str, current_version: str) -> bool: A boolean indicating whether the current version is greater than or equal to the expected version. """ - parts1 = [int(part) for part in expected_version.split(".")[:2]] - parts2 = [int(part) for part in current_version.split(".")[:2]] + def extract_version_parts(version_str): + # Extract the first version-like pattern (e.g., 2.8, 2.10.0) + match = re.search(r"\d+(\.\d+)+", version_str) + if match: + return tuple(int(x) for x in match.group().split(".")) + return () + + parts1 = extract_version_parts(expected_version) + parts2 = extract_version_parts(current_version) + # Pad shorter version with zeros for fair comparison + maxlen = max(len(parts1), len(parts2)) + parts1 += (0,) * (maxlen - len(parts1)) + parts2 += (0,) * (maxlen - len(parts2)) return parts2 >= parts1 + + @staticmethod + def convert_nvme_output( + new_output: Dict[str, Any], + ) -> list[Optional[Dict[str, Union[str, int]]]]: + """ + Convert the new nvme list -o json output to old format + + Args: + new_output: nvme list -o json output in new format + + Return: + old_format_list: old format + """ + controllers = new_output.get("Controllers", []) + old_format_list = [] + + for controller in controllers: + if "Namespaces" in controller and controller["Namespaces"]: + for namespace_info in controller["Namespaces"]: + # Create the old output format + old_format = { + "NameSpace": namespace_info.get("NSID"), + "DevicePath": f"/dev/{namespace_info.get('NameSpace')}", + "Firmware": controller.get("Firmware"), + "Index": int(controller.get("Controller").replace("nvme", "")), + "ModelNumber": controller.get("ModelNumber"), + "SerialNumber": controller.get("SerialNumber"), + "UsedBytes": namespace_info.get("UsedBytes"), + "MaximumLBA": namespace_info.get("MaximumLBA"), + "PhysicalSize": namespace_info.get("PhysicalSize"), + "SectorSize": namespace_info.get("SectorSize"), + } + old_format_list.append(old_format) + + return old_format_list + + @staticmethod + def get_nvme_list_subsys(host: Host) -> Dict[str, str]: + """ + This method returns the nvme drive name to its pci address + Args: + host: Host object + + Returns: + nvme_pcie_link: Dictionary of nvme device and its corresponding pcie link + """ + nvme_pcie_link = {} + out = host.run("nvme list-subsys") + for line in out.split("\n"): + if "live" in line: + match = re.search(r"nvme(\d+)\s+pcie\s+(\d+:\d+:\d+.\d+)", line) + if match: + nvme_pcie_link[match.group(1)] = match.group(2) + return nvme_pcie_link + + @staticmethod + def parse_json_string(out: str) -> dict: + """ + Parse the json string and return the json value. + Args: + out (str): The json string to be parsed. + Returns: + dict: The parsed json value. + """ + json_match = re.search(r"\{.*\}", out, re.DOTALL) + out_json = {} + if json_match: + try: + json_str = json_match.group(0) + out_json = json.loads(json_str) + except json.decoder.JSONDecodeError as e: + raise TestError( + f"Failed to convert to JSON: {out}\nError:{e}", + component=COMPONENT.STORAGE_DRIVE, + error_type=ErrorType.TOOL_ERR, + ) + return out_json diff --git a/src/autoval_ssd/tests/nvme_cli/compare_version_outputs.json b/src/autoval_ssd/tests/nvme_cli/compare_version_outputs.json new file mode 100644 index 0000000..0cc043f --- /dev/null +++ b/src/autoval_ssd/tests/nvme_cli/compare_version_outputs.json @@ -0,0 +1,6 @@ +{ + "drive_type": "ssd", + "drive_interface": "nvme", + "include_boot_drive": true, + "comparand_nvme_version": "nvme-cli-2.8-1.hs.el9" +} diff --git a/src/autoval_ssd/tests/nvme_cli/nvme_cli.py b/src/autoval_ssd/tests/nvme_cli/nvme_cli.py index 4515419..d19d603 100644 --- a/src/autoval_ssd/tests/nvme_cli/nvme_cli.py +++ b/src/autoval_ssd/tests/nvme_cli/nvme_cli.py @@ -4,17 +4,26 @@ """Test to validate NVME cli commands""" import datetime +import importlib import json +import os import re +from pprint import pformat +from typing import Any, Dict, List, Optional from autoval.lib.host.component.component import COMPONENT from autoval.lib.utils.async_utils import AsyncJob, AsyncUtils + from autoval.lib.utils.autoval_errors import ErrorType from autoval.lib.utils.autoval_exceptions import TestError from autoval.lib.utils.autoval_log import AutovalLog from autoval.lib.utils.autoval_utils import AutovalUtils +from autoval.lib.utils.file_actions import FileActions +from autoval.lib.utils.site_utils import SiteUtils +from autoval_ssd.lib.utils.fio.fio_synth_flash_utils import FioSynthFlashUtils from autoval_ssd.lib.utils.storage.nvme.fdp_utils import FDPUtils +from autoval_ssd.lib.utils.storage.nvme.latency_monitor_utils import LatencyMonitor from autoval_ssd.lib.utils.storage.nvme.nvme_drive import NVMeDrive from autoval_ssd.lib.utils.storage.nvme.nvme_resize_utils import NvmeResizeUtil from autoval_ssd.lib.utils.storage.nvme.nvme_utils import NVMeUtils @@ -48,24 +57,81 @@ def __init__(self, *args, **kwargs) -> None: self.arbitration_mechanism = self.test_control.get( "arbitration_mechanism", True ) + self.nvme_telemetry_log_timeout = self.test_control.get( + "telemetry_log_timeout", 1200 + ) + # List of NVMECli commands to skip + self.skip_commands = self.test_control.get("skip_commands", []) + self.fdp_setup = self.test_control.get("fdp_setup", False) self.fdp_enabled = False + self.comparand_nvme_version = self.test_control.get( + "comparand_nvme_version", None + ) def execute(self) -> None: self.log_info("Test to run NVME Cli commands") - version = NVMeUtils.get_nvme_version(self.host) - self.log_info(f"Running NVME version {version}") - AsyncUtils.run_async_jobs( + first_nvme_version = NVMeUtils.get_nvme_version(self.host) + first_nvmecli_command_output = self.run_nvme_cli_commands(first_nvme_version) + if self.comparand_nvme_version: + self.install_nvme_version(self.comparand_nvme_version) + new_nvme_version = NVMeUtils.get_nvme_version(self.host) + if first_nvme_version == new_nvme_version: + self.validate_not_equal( + first_nvme_version, + new_nvme_version, + "Can't compare versions when the new version is the same as the current one", + ) + else: + new_nvme_cli_command_output = self.run_nvme_cli_commands( + new_nvme_version + ) + self.compare_command_outputs( + first_nvme_version, + first_nvmecli_command_output, + new_nvme_version, + new_nvme_cli_command_output, + ) + + def run_nvme_cli_commands(self, nvme_version: str) -> List[Dict]: + """ + Run all nvme-cli commands for this test. + + Args: + nvme_version: The version of nvme-cli currently installed. + Returns: + A list of dictionaries representing the output of each command that was run. + """ + self.log_info(f"Running nvme-cli commands for version {nvme_version}") + if self.skip_commands: + self.log_info(f"Skipping NVMECli commands: {self.skip_commands}") + command_outputs = AsyncUtils.run_async_jobs( [ - AsyncJob(func=self.validate_nvme_drives, args=[drive]) + AsyncJob( + func=self.validate_nvme_drives, + args=[drive, nvme_version], + ) for drive in self.test_drives ] ) if self.fdp_setup: self.validate_fdp() + self.validate_latency_monitor() + return command_outputs - def validate_nvme_drives(self, drive) -> None: - """Check drive nvme is write mode enabled""" + def validate_nvme_drives(self, drive: NVMeDrive, nvme_version: str) -> List[Dict]: + """ + This method performs a series of NVMe cli commands on the NVMe drive + and performs validation using the output of each command that is run. + The output is additionally saved so that it can be compared with an + output for another version, if specified in the test control. + + Args: + drive: The NVMe drive to validate. + nvme_version: The version of nvme-cli currently installed. + Returns: + A list of dictionaries representing the output of each command that was run. + """ AutovalUtils.validate_condition( (not drive.check_readonly_mode()), "Check drive nvme is write mode enabled %s" % drive.block_name, @@ -73,25 +139,36 @@ def validate_nvme_drives(self, drive) -> None: component=COMPONENT.STORAGE_DRIVE, error_type=ErrorType.NVME_ERR, ) - self._get_fw_log(drive) - self._get_smart_log(drive) - self._get_error_log(drive) - self._get_nvme_ns_map(drive) - self._get_id_ns(drive) - self._get_feature(drive) - self._get_internal_log(drive) - drive.get_ocp_telemetry_string_log() - self._get_effects_log(drive) - self._get_vs_timestamp(drive) - self._validate_power_mode(drive) - self._validate_capacity(drive) - self._check_oacs_device_self_test(drive) + self.methods_to_call = { + "_get_fw_log": self._get_fw_log, + "_get_smart_log": self._get_smart_log, + "_get_error_log": self._get_error_log, + "_get_nvme_ns_map": self._get_nvme_ns_map, + "_get_id_ns": self._get_id_ns, + "_get_feature": self._get_feature, + "_get_effects_log": self._get_effects_log, + "_get_vs_timestamp": self._get_vs_timestamp, + "_validate_power_mode": self._validate_power_mode, + "_validate_capacity": self._validate_capacity, + "_check_oacs_device_self_test": self._check_oacs_device_self_test, + "_get_internal_log": self._get_internal_log, + "get_ocp_telemetry_string_log": lambda drive: drive.get_ocp_telemetry_string_log(), + } + command_outputs = [] + for method_name, method in self.methods_to_call.items(): + if method_name not in self.skip_commands: + command_outputs.append( + {"method_name": method_name, "output": method(drive)} + ) + drive.get_smartctl_output() + drive.get_ocp_hardware_component_log(nvme_version) if self.crypto_erase: self._validate_crypto_erase_support(drive) if self.arbitration_mechanism: self._validate_arbitration_mechanism(drive) + return command_outputs - def _validate_arbitration_mechanism(self, drive) -> None: + def _validate_arbitration_mechanism(self, drive: NVMeDrive) -> None: # Check arbitration_mechanism out = drive.get_arbitration_mechanism_status() if out: @@ -114,13 +191,13 @@ def _validate_arbitration_mechanism(self, drive) -> None: error_type=ErrorType.DRIVE_ERR, ) # Check csts - csts_match = re.search(r"csts\s+:\s+(\d+)", out) + csts_match = re.search(r"csts\s+:\s+(0x[0-9a-fA-F]+|\d+)", out) if csts_match: - csts = int(csts_match.group(1)) + csts = int(csts_match.group(1), 0) AutovalUtils.validate_equal( csts, 1, - f"{drive.block_name}: csts is {csts}", + "%s: csts is %s" % (drive.block_name, csts), component=COMPONENT.STORAGE_DRIVE, error_type=ErrorType.DRIVE_ERR, ) @@ -133,7 +210,7 @@ def _validate_arbitration_mechanism(self, drive) -> None: error_type=ErrorType.DRIVE_ERR, ) - def _validate_crypto_erase_support(self, drive) -> None: + def _validate_crypto_erase_support(self, drive: NVMeDrive) -> None: out = drive.get_crypto_erase_support_status() if out is False: self.log_info( @@ -149,21 +226,43 @@ def _validate_crypto_erase_support(self, drive) -> None: error_type=ErrorType.NVME_ERR, ) - def _get_fw_log(self, drive) -> None: + def _get_fw_log(self, drive: NVMeDrive) -> Dict: + """ + Runs the nvme fw-log command on the provided drive. + + Args: + drive: The NVMe drive to run the command on. + Returns: + Output of the nvme fw-log command as a dict. + """ + fw_log = {} out = drive.get_fw_log() - out_json = json.loads(out) + try: + fw_log = json.loads(out) + except json.decoder.JSONDecodeError: + fw_log = NVMeUtils.parse_json_string(out) + self.validate_greater( - len(out_json), + len(fw_log), 0, - str(out_json), + str(fw_log), raise_on_fail=False, component=COMPONENT.STORAGE_DRIVE, error_type=ErrorType.DRIVE_ERR, ) + return fw_log - def _get_smart_log(self, drive: NVMeDrive) -> None: - out_json = drive.get_smart_log() - smart_log = out_json["smart-log"] + def _get_smart_log(self, drive: NVMeDrive) -> Dict: + """ + Runs the nvme smart-log command on the provided drive. + + Args: + drive: The NVMe drive to run the command on. + Returns: + Output of the nvme smart-log command as a dict. + """ + out = drive.get_smart_log() + smart_log = out["smart-log"] self.validate_greater( len(smart_log), 0, @@ -172,42 +271,47 @@ def _get_smart_log(self, drive: NVMeDrive) -> None: component=COMPONENT.STORAGE_DRIVE, error_type=ErrorType.DRIVE_ERR, ) + return smart_log - def _get_error_log(self, drive: NVMeDrive) -> None: + def _get_error_log(self, drive: NVMeDrive) -> Dict: """ - This function retrieves the error log from the given drive. + Retrieves the error log from the given drive. Args: - drive (NVMeDrive): The drive to get the error log from. - - + drive: The NVMe drive to run the command on. Returns: - None + Output of the nvme error-log command as a dict. """ + out_json = {} out = drive.get_error_log() - out_json = json.loads(out) + try: + out_json = json.loads(out) + except json.decoder.JSONDecodeError: + out_json = NVMeUtils.parse_json_string(out) error_log = out_json["errors"] - self.validate_greater( - len(error_log), - 0, + self.validate_non_empty_list( + list(error_log), msg=f"error-log from drive {str(drive)} has at least one entry.", raise_on_fail=False, component=COMPONENT.STORAGE_DRIVE, error_type=ErrorType.DRIVE_ERR, ) + return error_log[0] - def _get_id_ns(self, drive: NVMeDrive) -> None: + def _get_id_ns(self, drive: NVMeDrive) -> Dict: """ This function retrieves identity namespace results for the given drive. Args: - drive (NVMeDrive): The drive to get namespace results from. - + drive: The NVMe drive to run the command on. Returns: - None + Output of the nvme id-ns command as a dict. """ out = drive.get_id_ns() - out_json = json.loads(out) + try: + out_json = json.loads(out) + except json.decoder.JSONDecodeError: + out_json = NVMeUtils.parse_json_string(out) self.validate_greater( len(out_json), 0, @@ -216,8 +320,17 @@ def _get_id_ns(self, drive: NVMeDrive) -> None: component=COMPONENT.STORAGE_DRIVE, error_type=ErrorType.DRIVE_ERR, ) + return out_json - def _get_nvme_ns_map(self, drive) -> None: + def _get_nvme_ns_map(self, drive: NVMeDrive) -> Dict: + """ + Runs the nvme list command on the provided drive. + + Args: + drive: The NVMe drive to run the command on. + Returns: + Output of the nvme list command as a dict. + """ n_s = NVMeUtils.get_nvme_ns_map( self.host, drive.block_name, drive.serial_number ) @@ -230,8 +343,17 @@ def _get_nvme_ns_map(self, drive) -> None: component=COMPONENT.STORAGE_DRIVE, error_type=ErrorType.DRIVE_ERR, ) + return n_s + + def _get_feature(self, drive: NVMeDrive) -> List[str]: + """ + Runs the nvme get-feature command on the provided drive. - def _get_feature(self, drive) -> None: + Args: + drive: The NVMe drive to run the command on. + Returns: + A list of strings containing the fetched operating parameters for each feature ID. + """ feature_info = drive.get_feature() self.validate_greater( len(feature_info), @@ -241,8 +363,9 @@ def _get_feature(self, drive) -> None: component=COMPONENT.STORAGE_DRIVE, error_type=ErrorType.DRIVE_ERR, ) + return feature_info - def _get_internal_log(self, drive: NVMeDrive) -> None: + def _get_internal_log(self, drive: NVMeDrive) -> Optional[bool]: """Gets Vendor Specific Log. This method gets the internal log in binary format for different @@ -252,16 +375,19 @@ def _get_internal_log(self, drive: NVMeDrive) -> None: ---------- drive : :obj: 'Class' Object of vendor class. + Returns: + Optional[bool]: True if the internal log has been taken, False otherwise. + If the drive does not support internal log, None is returned. """ try: - status = drive.get_internal_log() + status = drive.get_internal_log(self.nvme_telemetry_log_timeout) self.log_info(f"Internal log has {'' if status else 'not'} been taken") + return status except NotImplementedError as exc: self.log_info(exc) - def _get_effects_log(self, drive) -> None: - """Gets Effects Log. - + def _get_effects_log(self, drive: NVMeDrive) -> Optional[Dict]: + """ This method retrieves the ACS(Admin Command Set) and IOCS(I/O Command Set) logs of the drive. @@ -269,7 +395,8 @@ def _get_effects_log(self, drive) -> None: drive (NVMeDrive): The drive from which to retrieve effects logs. Returns: - None + Optional[Dict]: A dictionary containing the ACS and IOCS logs. + If the drive does not support effects logs, None is returned. """ try: out = drive.get_effects_log() @@ -281,6 +408,7 @@ def _get_effects_log(self, drive) -> None: component=COMPONENT.STORAGE_DRIVE, error_type=ErrorType.DRIVE_ERR, ) + return out except NotImplementedError as exc: self.log_info(exc) @@ -290,7 +418,7 @@ def get_test_params(self) -> str: ) return params - def _get_vs_timestamp(self, drive) -> None: + def _get_vs_timestamp(self, drive: NVMeDrive) -> Optional[int]: """Gets Vendor Specific Drive Timestamp. This method gets the drive up time for different @@ -300,6 +428,9 @@ def _get_vs_timestamp(self, drive) -> None: ---------- drive : :obj: 'Class' Object of vendor class. + Returns: + Optional[int]: Drive up time in seconds. If the drive does not support + vendor specific timestamp, None is returned. """ try: out = drive.get_vs_timestamp() @@ -311,60 +442,63 @@ def _get_vs_timestamp(self, drive) -> None: except Exception: time = "%s years" % years self.log_info(f"Drive up time {drive}: {time}") + return out except NotImplementedError as exc: self.log_info(exc) except Exception as exc: raise TestError( - f"get_vs_timestamp failed for drive {drive}: {str(exc)}", + "get_vs_timestamp failed for drive %s: %s" % (drive, str(exc)), component=COMPONENT.STORAGE_DRIVE, error_type=ErrorType.DRIVE_ERR, ) - def _validate_power_mode(self, drive) -> None: - """Validate Power Mode. - + def _validate_power_mode(self, drive) -> Dict: + """ This method checks for the npss (Number of Power State Support) of the data drive. Current M.2 data SSD's have npss 0 or 1. For npss more than 1 and for the data drive capacity 2TB or 4TB the required power mode is set to reduce the power consumption by nvme and validates by get power mode. - Parameters - ---------- - drive : :obj: 'Class' - Object of nvme drive class. + Args: + drive: The NVMe drive object. + Returns: + The npss and power_modes of the drive as a dict. """ npss = NVMeUtils.get_id_ctrl(self.host, drive.block_name)["npss"] - if npss > 1: - power_modes = drive.get_drive_supported_power_modes() - for power_mode in power_modes: - set_state = drive.set_power_mode(power_mode) - get_state = drive.get_power_mode() - AutovalUtils.validate_equal( - get_state, - set_state, - f"Correct power-mode set on /dev/{drive}", - component=COMPONENT.STORAGE_DRIVE, - error_type=ErrorType.DRIVE_ERR, - raise_on_fail=False, - ) - # Reverting the power state to 0 - set_state = drive.set_power_mode(0) + output = {"npss": npss} + if npss <= 1: + AutovalLog.log_info(f"/dev/{drive} supported only one power-mode") + return output + power_modes = drive.get_drive_supported_power_modes() + output["power_modes"] = power_modes + for power_mode in power_modes: + set_state = drive.set_power_mode(power_mode) get_state = drive.get_power_mode() AutovalUtils.validate_equal( - set_state, get_state, f"Resetting power-mode PS0 on /dev/{drive} " + get_state, + set_state, + f"Correct power-mode set on /dev/{drive}", + component=COMPONENT.STORAGE_DRIVE, + error_type=ErrorType.DRIVE_ERR, + raise_on_fail=False, ) - else: - AutovalLog.log_info(f"/dev/{drive} supported only one power-mode") + # Reverting the power state to 0 + set_state = drive.set_power_mode(0) + get_state = drive.get_power_mode() + AutovalUtils.validate_equal( + set_state, get_state, f"Resetting power-mode PS0 on /dev/{drive} " + ) + return output - def _check_oacs_device_self_test(self, drive) -> None: + def _check_oacs_device_self_test(self, drive: NVMeDrive) -> Optional[Dict]: """Validate Device self-test command support Method checks for OACS field from id-ctrl and validates Device self-test command support - Parameters - ---------- - drive : :obj: 'Class' - Object of nvme drive class. + Args: + drive: The NVMe drive object. + Returns: + The oacs value of the drive as a dict. """ oacs = NVMeUtils.get_id_ctrl(self.host, drive.block_name)["oacs"] self.log_info(f"Test to Check dev_self_test management {oacs} {hex(oacs)}") @@ -380,15 +514,16 @@ def _check_oacs_device_self_test(self, drive) -> None: if support_dev_self_test_management == 0x0: AutovalLog.log_info(f"/dev/{drive} does not support self-test") return + return oacs - def _validate_capacity(self, drive) -> None: + def _validate_capacity(self, drive: NVMeDrive) -> Optional[Dict]: """Validate drive capacity Method checks for unvmcap and tnvmcap from id-ctrl and validates drive capacity - Parameters - ---------- - drive : :obj: 'Class' - Object of nvme drive class. + Args: + drive: The NVMe drive object. + Returns: + The oacs, tnvmcap and nsze values of the drive as a dict. """ if str(drive) == self.boot_drive: # namespace_management not supported on boot drive` @@ -415,6 +550,11 @@ def _validate_capacity(self, drive) -> None: component=COMPONENT.STORAGE_DRIVE, error_type=ErrorType.DRIVE_ERR, ) + return { + "oacs": oacs, + "tnvmcap": tnvmcap, + "nsze": nsze, + } def validate_fdp(self) -> None: """ @@ -432,3 +572,152 @@ def validate_fdp(self) -> None: FDPUtils.fdp_cleanup(self.host, nvme_id_ctrls) AutovalLog.log_info("FDP cleanup completed") + + def validate_latency_monitor(self) -> None: + """ + Enables latency monitor, run the workload and validate the bucke counter on single drive. + """ + nvme_version = NVMeUtils.get_nvme_version(self.host) + if not NVMeUtils.compare_versions("2.9.0", nvme_version): + self.log_info( + "Skipping latency monitor test. Nvme version 2.9 or higher required for ocp lacteny monitor cmds" + ) + return + + FioSynthFlashUtils.tool_setup(self.host) + test_drives = [ + drive for drive in self.test_drives if drive.block_name != self.boot_drive + ][:1] + self.test_control["max_latency_lm_validation"] = True + self.test_control["ocp_lm_commands"] = True + workload = "Nvme_Cli_Wkld" + work_dir = self.dut_logdir[self.host.hostname] + self.latency_monitor = LatencyMonitor( + host=self.host, + test_drives=test_drives, + test_control=self.test_control, + ) + lm_enabled_drives = self.latency_monitor.enable( + workload=workload, working_directory=work_dir + ) + self.log_info(f"Running the {workload} Workload.") + # Generate the run folder locations + run_folder = f"test_{workload}" + # Run synthflash + for drive in test_drives: + result_folder = f"{run_folder}_{drive.block_name}_results" + device = f"/dev/{drive.block_name}" + # Run workload + cmd = f"cd {work_dir} && fiosynth -d {device} -w {workload} -f {result_folder} -n 1 -g y --lm" + self.log_info(f"Starting command: {cmd}") + self.host.run_get_result(cmd, timeout=70500) + + self.latency_monitor.collect_logs(workload, work_dir) + self.latency_monitor.parse_and_validate_results( + synth_workload_result_dir=work_dir, + lm_enabled_drives=lm_enabled_drives, + ) + self.latency_monitor.disable(working_directory=work_dir) + + def compare_command_outputs( + self, + first_nvme_version: str, + first_nvme_version_outputs: List[Dict], + new_nvme_version: str, + new_nvme_version_outputs: List[Dict], + ) -> None: + """ + Compares the outputs of the commands that were saved for each nvme-cli version. + Only the outputs of commands run on the last drive are compared. + Cmds outputs with difference are saved to the results directory. + + Args: + first_nvme_version: The first nvme-cli version to compare. + first_nvme_version_outputs: The outputs of the commands for the first nvme-cli version. + new_nvme_version: The new nvme-cli version to compare. + new_nvme_version_outputs: The outputs of the commands for the new nvme-cli version. + """ + self.log_info( + f"Comparing the output of commands for the nvme-cli versions {first_nvme_version} and {new_nvme_version}" + ) + output_str = ( + f"NVMe-CLI Version Comparison: {first_nvme_version} vs {new_nvme_version}\n" + ) + output_str += "=" * 80 + "\n\n" + + for i in range(len(first_nvme_version_outputs[-1])): + first_nvme_version_output = first_nvme_version_outputs[-1][i] + new_nvme_version_output = new_nvme_version_outputs[-1][i] + if ( + first_nvme_version_output["method_name"] + != new_nvme_version_output["method_name"] + ): + self.log_warning( + f"Output comparison failed due to method name mismatch: {first_nvme_version_output['method_name']} != {new_nvme_version_output['method_name']}" + ) + continue + + output_str += self._compare_and_format_output( + first_nvme_version, + first_nvme_version_output, + new_nvme_version, + new_nvme_version_output, + ) + _result_dir = SiteUtils.get_resultsdir() + dest_file_path = os.path.join(_result_dir, "nvme_cli_version_comparison.log") + AutovalLog.log_info(f"Saving nvme comparison log to: {dest_file_path}") + FileActions.write_data(dest_file_path, output_str, append=True) + + def _compare_and_format_output( + self, + first_nvme_version: str, + first_nvme_version_output: Any, + new_nvme_version: str, + new_nvme_version_output: Any, + ) -> str: + """ + Compares two command outputs and formats the differences if any. + + Args: + first_nvme_version: The first nvme-cli version. + first_nvme_version_output: The output dict of the first command. + new_nvme_version: The new nvme-cli version. + new_nvme_version_output: The output dict of the new command. + + Returns: + A formatted string containing the comparison results. + """ + DeepDiff = importlib.import_module("deepdiff").DeepDiff + output_str = "" + first_output = first_nvme_version_output["output"] + new_output = new_nvme_version_output["output"] + diff = DeepDiff( + first_output, + new_output, + verbose_level=1, + ignore_order=True, + ignore_numeric_type_changes=True, + ) + + if diff: + filtered_diff = "\n".join(diff.keys()) + AutovalLog.log_warning( + f"Output differs between {first_nvme_version} and {new_nvme_version} for {first_nvme_version_output['method_name']}: {filtered_diff}" + ) + output_str += f"Method: {first_nvme_version_output['method_name']}\n" + output_str += "-" * 40 + "\n" + output_str += f"Original Output ({first_nvme_version}):\n" + output_str += f"{pformat(first_output, width=80, indent=2)}\n\n" + output_str += f"New Output ({new_nvme_version}):\n" + output_str += f"{pformat(new_output, width=80, indent=2)}\n\n" + output_str += "Differences:\n" + output_str += f"{pformat(diff, width=80, indent=2)}\n" + output_str += "\n" + "=" * 40 + "\n\n" + else: + AutovalLog.log_info( + f"No output difference between {first_nvme_version} and {new_nvme_version} for {first_nvme_version_output['method_name']}" + ) + return output_str + + def cleanup(self, *args, **kwargs) -> None: + super().cleanup(*args, **kwargs)