Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 136 additions & 44 deletions src/autoval_ssd/lib/utils/storage/nvme/nvme_drive.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

# pyre-unsafe
"""library to manage nvme drive"""

import json
import os
import re
import time
from enum import Enum
from time import sleep
from typing import Dict, List
from typing import Optional, Tuple

from autoval.lib.host.component.component import COMPONENT

Expand Down Expand Up @@ -75,7 +76,13 @@ class NVMeDrive(Drive):
]
NVMECLI_MANUFACTURER = None

def __init__(self, host, block_name, config=None) -> None:
def __init__(
self,
host,
block_name,
config=None,
nvme_list_info: Optional[list[dict[str, str]]] = None,
) -> None:
"""
Class for storing data and interacting with NVME drives

Expand All @@ -89,6 +96,7 @@ def __init__(self, host, block_name, config=None) -> None:
- cfg/nvme_smart_fdi
"""
super().__init__(host, block_name, config=config)
self.nvme_list_info = nvme_list_info
if config is None:
config = DEFAULT_VALIDATE_CONFIG
self.interface = DriveInterface.NVME
Expand All @@ -115,8 +123,8 @@ def __init__(self, host, block_name, config=None) -> None:
self.fw_ver = None
self.current_fw_ver = None
self.fw_ns_slots_models_map = {}
self.ocp_2_6_drives: List = []
self.workload_target_drives: List = []
self.ocp_2_6_drives: list = []
self.workload_target_drives: list = []
self.lmparser_ocp_2_0_drives = {}
self.cfg_dir = ""

Expand All @@ -128,7 +136,7 @@ def get_smart_log_keys(self) -> None:
smart_log = self.get_smart_log()
self.smart_log_keys = self._flatten_validate_config_dict(smart_log).keys()

def load_config(self, config_file: str) -> Dict:
def load_config(self, config_file: str) -> dict:
"""
@param config_file
@return config for smart validation
Expand Down Expand Up @@ -157,10 +165,10 @@ def get_target_path() -> str:
target_path = ""
current_file_path = os.path.abspath(__file__)
try:
pattern = r"^(/.*?)/autoval_ssd/"
pattern = r"^(/.*?)/autoval_ssd/lib"
match = re.search(pattern, current_file_path)
if match:
target_path = match.group(0)[:-1]
target_path = match.group(0).replace("/lib", "")
except Exception:
raise AutovalFileNotFound("The required file path is not found")
return target_path
Expand Down Expand Up @@ -211,7 +219,7 @@ def get_arbitration_mechanism_status(self):
"""
Method to get the controller properties
"""
nvme_drive = "/dev/%s" % self.block_name
nvme_drive = "/dev/%s" % re.sub(r"n\d+$", "", self.block_name)
cmd = "nvme show-regs %s -H" % nvme_drive
out = AutovalUtils.validate_no_exception(
self.host.run,
Expand Down Expand Up @@ -439,30 +447,35 @@ def get_feature(self, feature_id=None, queue_id=None):
nvme_drive = "/dev/%s" % self.block_name
features_info = []
for _id in feature_ids:
if queue_id:
cmd = f"nvme get-feature {nvme_drive} -f {_id} -H {queue_id}"
else:
cmd = f"nvme get-feature {nvme_drive} -f {_id} -H"
out = self.host.run_get_result(cmd=cmd).stdout # noqa
cmd = f"nvme get-feature {nvme_drive} -f {_id} -H"
out = self.host.run_get_result(cmd=cmd).stdout
feature_info = ",".join([s.strip() for s in out.splitlines()])
features_info.append(feature_info)
return features_info

def get_capacity(self, unit: str = "byte"):
"""Return drive capacity"""
_byte = NVMeUtils.get_from_nvme_list(self.host, self.block_name, "PhysicalSize")
_byte = NVMeUtils.get_from_nvme_list(
self.host, self.block_name, "PhysicalSize", self.nvme_list_info
)
return DiskUtils.convert_from_bytes(_byte, unit)

def get_serial_number(self):
"""Return drive serial_number"""
return NVMeUtils.get_from_nvme_list(self.host, self.block_name, "SerialNumber")
return NVMeUtils.get_from_nvme_list(
self.host, self.block_name, "SerialNumber", self.nvme_list_info
)

def _get_model(self):
return NVMeUtils.get_from_nvme_list(self.host, self.block_name, "ModelNumber")
return NVMeUtils.get_from_nvme_list(
self.host, self.block_name, "ModelNumber", self.nvme_list_info
)

def get_firmware_version(self):
"""Return drive FW version"""
return NVMeUtils.get_from_nvme_list(self.host, self.block_name, "Firmware")
return NVMeUtils.get_from_nvme_list(
self.host, self.block_name, "Firmware", self.nvme_list_info
)

def get_manufacturer(self) -> str:
"""Return drive manufacturer"""
Expand All @@ -485,7 +498,7 @@ def get_smart_log(self):
smart_log.update(self.get_ocp_smart_log())
return smart_log

def get_ocp_smart_log(self) -> Dict:
def get_ocp_smart_log(self) -> dict:
"""
Collect OCP smart log and return it.
"""
Expand Down Expand Up @@ -527,23 +540,40 @@ def get_ocp_telemetry_string_log(self) -> None:
cmd = f"nvme ocp telemetry-string-log /dev/{self.block_name}"
self.host.run(cmd=cmd, ignore_status=True, working_directory=dut_logdir)

def get_internal_log(self) -> bool:
"""
Return drive telemetry log.
def get_internal_log(self, timeout: int, phase: str = "", flag: str = "") -> bool:
"""Return drive telemetry log.

Args:
None
----
timeout: The timeout value for nvme telemetry-log command.
phase: The phase of the telemetry log, used to differentiate log files before and after test.
flag: Whether to append the -d 1 flag to the command and default is False.

Returns:
The completion status of internal log file generation.
-------
bool: The completion status of internal log file generation.
"""
dut_logdir = SiteUtils.get_dut_logdir(self.host.hostname)
cmd = f"nvme telemetry-log --output-file=bin /dev/{self.block_name}"
ret = self.host.run_get_result(
cmd=cmd, ignore_status=True, working_directory=dut_logdir
cmd = f"timeout {timeout}s nvme telemetry-log --output-file={self.serial_number}__telemetrylog_{phase}.bin /dev/{self.block_name}{flag}"
working_directory = (
f"{dut_logdir}/telemetry_{phase}" if phase and not flag else dut_logdir
)
if ret.return_code != 0:
AutovalLog.log_info(f"WARNING: command '{cmd}' failed with error code")
try:
ret = self.host.run_get_result(
cmd=cmd,
ignore_status=True,
working_directory=working_directory,
timeout=timeout,
)
if ret.return_code != 0:
AutovalLog.log_info(f"WARNING: command '{cmd}' failed with error code")
return False
return True
except TimeoutError:
AutovalLog.log_info(
f"WARNING: Failed to complete '{cmd}' within {timeout} seconds on {self.host.hostname}"
)
return False
return True

def get_effects_log(self):
"""Gets Effects Log.
Expand All @@ -560,9 +590,13 @@ def get_effects_log(self):
TestStepError
When fails to retrieve the command effects log.
"""
cmd = "nvme effects-log /dev/%s -o json" % self.block_name
cmd = "nvme effects-log /dev/%s -o json" % re.sub(r"n\d+$", "", self.block_name)
out = self.host.run(cmd=cmd)
return json.loads(out)
try:
out = json.loads(out)
except json.decoder.JSONDecodeError:
out = NVMeUtils.parse_json_string(out)
return out

def get_id_ctrl(self):
"""Return id_ctrl"""
Expand All @@ -588,7 +622,7 @@ def supports_flash_temp_check(self) -> bool:
# can be provided in the vendor subclass
return True

def get_nand_write_param(self) -> Dict[str, str]:
def get_nand_write_param(self) -> dict[str, str]:
"""Return nand_write params"""
# Can be provided in vendor subclass
return {}
Expand All @@ -604,7 +638,7 @@ def get_vs_nand_stat_log(self) -> None:
return

def get_write_amplification(
self, smart_before: Dict[str, Dict], smart_after: Dict[str, Dict]
self, smart_before: dict[str, dict], smart_after: dict[str, dict]
) -> bool:
"""
Method to calculate the Flash Write Amplification
Expand Down Expand Up @@ -644,7 +678,7 @@ def get_write_amplification(
)
if waf:
AutovalLog.log_info(
"Lifetime WAF for drive %s is %s" % (self.block_name, waf)
f"Lifetime WAF for drive {self.block_name} is {waf}"
)
write_amplification["lifetime_write_amplification"] = waf
waf = {
Expand All @@ -671,15 +705,15 @@ def get_write_amplification(
# Calculate Write amp for the currently running test
write_amplification["test_write_amplification"] = 0
waf, error = self.calculate_waf(host_delta, nand_delta, nand_write_formula)
AutovalLog.log_info(
"WAF during this test for drive %s: %s" % (self.block_name, waf)
)
AutovalLog.log_info(f"WAF during this test for drive {self.block_name}: {waf}")
write_amplification["test_write_amplification"] = waf
if error:
AutovalLog.log_info(
"Cannot calculate WAF for drive %s due to %s" % (self.block_name, error)
"Cannot calculate WAF for drive {} due to {}".format(
self.block_name, error
)
)
AutovalLog.log_info("Drive %s: %s" % (self.block_name, write_amplification))
AutovalLog.log_info(f"Drive {self.block_name}: {write_amplification}")
return True

def calculate_waf(self, h_write, n_write, nand_write_formula):
Expand Down Expand Up @@ -726,7 +760,7 @@ def convert_nand_write(self, nand_write) -> float:
nand_write = float(nand_write)
except Exception as exc:
raise TestError(
"Failed convert %s to float: %s" % (nand_write, exc),
f"Failed convert {nand_write} to float: {exc}",
component=COMPONENT.STORAGE_DRIVE,
error_type=ErrorType.TOOL_ERR,
)
Expand Down Expand Up @@ -946,7 +980,7 @@ def fw_activate(
self,
drive_name: str,
file_name: str,
fw_slot: List[int],
fw_slot: list[int],
action: int,
nvme_admin_io=True,
) -> None:
Expand Down Expand Up @@ -1161,7 +1195,11 @@ def drive_health_check(self) -> None:
% self.block_name
)
smart_log = self.get_smart_log()
critical_warning = smart_log["smart-log"]["critical_warning"]
critical_warning_type = smart_log["smart-log"]["critical_warning"]
if isinstance(critical_warning_type, dict):
critical_warning = critical_warning_type["value"]
else:
critical_warning = critical_warning_type
if critical_warning > 0:
raise TestError(
f"The {self.manufacturer} drive {self.serial_number}"
Expand Down Expand Up @@ -1194,7 +1232,7 @@ def drive_erase_count(self) -> None:
except KeyError:
pass

def get_fw_update_ns_actions(self) -> List[int]:
def get_fw_update_ns_actions(self) -> list[int]:
"""Get Firmware update non supported actions.
This method will return the list of actions which are
not supported on the model.
Expand Down Expand Up @@ -1233,7 +1271,7 @@ def is_drive_degraded(self) -> None:
error_type=ErrorType.DRIVE_ERR,
)

def get_fw_slots(self) -> List[int]:
def get_fw_slots(self) -> list[int]:
"""Get available FW slots"""
nvme_drive = "/dev/%s" % self.block_name
out = self.get_nvme_id_ctrl(human_readable=True)
Expand Down Expand Up @@ -1536,3 +1574,57 @@ def is_lmparser_ocp_2_0_drive(self) -> bool:
if self.model in self.lmparser_ocp_2_0_drives[fw_ver]:
return True
return False

def get_ocp_hardware_component_log(self, nvme_version: str) -> None:
"""
Get OCP hardware component log

Args:
nvme_version: Current installed NVMe version
"""
if not NVMeUtils.compare_versions("2.10.0", nvme_version):
AutovalLog.log_info(
f"Current NVMe version '{nvme_version}' does not support OCP hardware component log"
)
return
DSSD_version = self.get_DSSD_version()
if DSSD_version is None:
AutovalLog.log_info(
f"Skipping OCP hardware component log collection. DSSD version is not available for {self.block_name}"
)
return
Major_version, Minor_version = DSSD_version
if Major_version < 2 or (Major_version == 2 and Minor_version < 5):
AutovalLog.log_info(
f"Skipping OCP hardware component log collection. DSSD version is less than 2.5 for {self.block_name}"
)
return

dut_logdir = SiteUtils.get_dut_logdir(self.host.hostname)
cmd = f"nvme ocp hardware-component-log /dev/{self.block_name}"
out = self.host.run_get_result(
cmd=cmd, ignore_status=True, working_directory=dut_logdir
)
AutovalUtils.validate_equal(
out.return_code,
0,
f"Collected OCP hardware component log for {self.block_name}",
component=COMPONENT.STORAGE_DRIVE,
error_type=ErrorType.DRIVE_ERR,
)

def get_DSSD_version(self) -> Optional[Tuple[int, int]]:
"""
Get DSSD version

Returns:
(Major, Minor) DSSD version
"""
cmd = f'nvme ocp smart-add-log /dev/{self.block_name} -o json | grep -E "(Major|Minor) Version Field"'
out = self.host.run_get_result(cmd=cmd, ignore_status=True)

if out.return_code == 0:
major = re.search(r'"Major Version Field"\s*:\s*(\d+)', out.stdout)
minor = re.search(r'"Minor Version Field"\s*:\s*(\d+)', out.stdout)
if major and minor:
return (int(major.group(1)), int(minor.group(1)))
Loading