From 104f1fbd2955d4ce7ec541aa24ba7e3e156e67d2 Mon Sep 17 00:00:00 2001 From: "Ahmed Hussein (amahussein)" Date: Mon, 4 Aug 2025 14:05:54 -0500 Subject: [PATCH 1/4] Use tools-API in qualx predict Signed-off-by: Ahmed Hussein (amahussein) Contributes to #1836 - uses tools-API in the predict code - introduces a new CSV Combiner class that automatically combine the CSV reports generated by multiple-apps and combine them into a single DF. --- .../rapids/qualification.py | 6 +- .../rapids/qualx/prediction.py | 22 ++-- .../spark_rapids_tools/api_v1/app_handler.py | 25 +++- .../src/spark_rapids_tools/api_v1/builder.py | 117 ++++++++++++++++++ .../api_v1/core/qual_handler.py | 5 + .../api_v1/result_handler.py | 22 ++++ .../tools/qualx/preprocess.py | 12 +- .../tools/qualx/qualx_main.py | 81 ++++++------ .../spark_rapids_tools/tools/qualx/util.py | 12 +- 9 files changed, 234 insertions(+), 68 deletions(-) diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index 78c7adb26..bb3e7a9f0 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -29,6 +29,7 @@ from spark_rapids_pytools.common.sys_storage import FSUtil from spark_rapids_pytools.common.utilities import Utils, TemplateGenerator from spark_rapids_pytools.rapids.qualification_core import QualificationCore +from spark_rapids_tools.api_v1.builder import APIResultHandler from spark_rapids_tools.enums import QualFilterApp, QualEstimationModel, SubmissionMode from spark_rapids_tools.tools.additional_heuristics import AdditionalHeuristics from spark_rapids_tools.tools.cluster_config_recommender import ClusterConfigRecommender @@ -537,12 +538,13 @@ def __update_apps_with_prediction_info(self, model_name = self.ctxt.platform.get_prediction_model_name() qual_output_dir = self.ctxt.get_csp_output_path() output_info = self.__build_prediction_output_files_info() - qual_handler = self.ctxt.get_ctxt('qualHandler') try: + # Build the QualCore handler object to handle the prediction model output + q_core_handler = APIResultHandler().qual_core().with_path(qual_output_dir).build() predictions_df = predict(platform=model_name, qual=qual_output_dir, output_info=output_info, model=estimation_model_args['customModelFile'], - qual_handlers=[qual_handler]) + qual_handlers=[q_core_handler]) except Exception as e: # pylint: disable=broad-except predictions_df = pd.DataFrame() self.logger.error( diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualx/prediction.py b/user_tools/src/spark_rapids_pytools/rapids/qualx/prediction.py index 1b66412b9..4a83d2265 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualx/prediction.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualx/prediction.py @@ -18,7 +18,8 @@ from spark_rapids_pytools.common.sys_storage import FSUtil from spark_rapids_pytools.rapids.qualx.qualx_tool import QualXTool -from spark_rapids_tools.tools.core.qual_handler import QualCoreHandler +from spark_rapids_tools.api_v1 import QualCoreResultHandler +from spark_rapids_tools.api_v1.builder import APIResultHandler from spark_rapids_tools.tools.qualx.qualx_main import predict from spark_rapids_tools.tools.qualx.util import print_summary, print_speedup_summary @@ -26,25 +27,16 @@ @dataclass class Prediction(QualXTool): """ - Wrapper layer around Prediction Tool. - - Attributes - ---------- - qual_output: str - Path to a directory containing qualification tool output. - qual_handler: QualCoreHandler - Handler for reading qualification core tool results. + A wrapper to run the QualX prediction stage on an existing Qual's output. + :param qual_output: Path to the directory containing the qualification tool output. """ qual_output: str = None - qual_handler: QualCoreHandler = None name = 'prediction' - def __post_init__(self): - """Initialize the QualCoreHandler from qual_output.""" - super().__post_init__() - if self.qual_output is not None: - self.qual_handler = QualCoreHandler(result_path=self.qual_output) + @property + def qual_handler(self) -> QualCoreResultHandler: + return APIResultHandler().qual_core().with_path(self.qual_output).build() def __prepare_prediction_output_info(self) -> dict: """ diff --git a/user_tools/src/spark_rapids_tools/api_v1/app_handler.py b/user_tools/src/spark_rapids_tools/api_v1/app_handler.py index bc9200490..7e12412e5 100644 --- a/user_tools/src/spark_rapids_tools/api_v1/app_handler.py +++ b/user_tools/src/spark_rapids_tools/api_v1/app_handler.py @@ -16,7 +16,7 @@ from dataclasses import dataclass, field from functools import cached_property -from typing import Optional +from typing import Optional, List import pandas as pd @@ -57,17 +57,32 @@ def uuid(self) -> str: """ return self._app_id - def patch_into_df(self, df: pd.DataFrame) -> pd.DataFrame: + def patch_into_df(self, + df: pd.DataFrame, + col_names: Optional[List[str]] = None) -> pd.DataFrame: """ Given a dataframe, this method will stitch the app_id and app-name to the dataframe. This can be useful in automatically adding the app-id/app-name to the data-frame :param df: the dataframe that we want to modify. + :param col_names: optional list of column names that defines the app_id and app_name to the + dataframe. It is assumed that the list comes in the order it is inserted in + the column names. :return: the resulting dataframe from adding the columns. """ + # TODO: We should consider add UUID as well, and use that for the joins instead. + # append attempt_id to support multiple attempts + col_values = [self.app_id] + if col_names is None: + # append attemptId to support multi-attempts + col_names = ['appId'] if not df.empty: - # TODO: We should consider add UUID as well, and use that for the joins instead. - df.insert(0, 'attemptId', self._attempt_id) - df.insert(0, 'appId', self._app_id) + for col_k, col_v in zip(reversed(col_names), reversed(col_values)): + if col_k not in df.columns: + df.insert(0, col_k, col_v) + else: + # if the column already exists, we should not overwrite it + # this is useful when we want to patch the app_id/app_name to an existing dataframe + df[col_k] = col_v return df @property diff --git a/user_tools/src/spark_rapids_tools/api_v1/builder.py b/user_tools/src/spark_rapids_tools/api_v1/builder.py index 77d1e0d50..59768b7ba 100644 --- a/user_tools/src/spark_rapids_tools/api_v1/builder.py +++ b/user_tools/src/spark_rapids_tools/api_v1/builder.py @@ -92,6 +92,11 @@ def rep_reader(self) -> 'ToolReportReaderT': raise ValueError(f'No reader found for table: {self._tbl}') return reader + @property + def tbl(self) -> str: + """Get the table id.""" + return self._tbl + @property def is_per_app_tbl(self) -> bool: if self._tbl is None: @@ -206,6 +211,118 @@ def _load_single_app(self) -> LoadDFResult: ) +@dataclass +class CSVCombiner(object): + """A class that combines multiple CSV reports into a single report.""" + rep_builder: CSVReport + _failed_app_processor: Optional[Callable[[str, LoadDFResult], None]] = field(default=None, init=False) + _success_app_processor: Optional[Callable[[ToolResultHandlerT, str, pd.DataFrame, dict], pd.DataFrame]] = ( + field(default=None, init=False)) + _combine_args: Optional[dict] = field(default=None, init=False) + + @property + def result_handler(self) -> ToolResultHandlerT: + """Get the result handler associated with this combiner.""" + return self.rep_builder.handler + + @staticmethod + def default_success_app_processor(result_handler: ToolResultHandlerT, + app_id: str, + df: pd.DataFrame, + combine_args: dict) -> pd.DataFrame: + """Default processor for successful applications.""" + col_names = None + app_entry = result_handler.app_handlers.get(app_id) + if not app_entry: + raise ValueError(f'App entry not found for ID: {app_id}') + if combine_args: + # check if the col_names are provided to stitch the app_ids + col_names = combine_args.get('col_names', None) + if col_names: + # patch the app_uuid and if the columns are defined. + return app_entry.patch_into_df(df, col_names=col_names) + return df + + def _evaluate_args(self) -> None: + """Evaluate the arguments to ensure they are set correctly.""" + + if self._success_app_processor is None: + # set the default processor for successful applications + self._success_app_processor = self.default_success_app_processor + # TODO: we should fail if the the combiner is built for AppIds but columns are not defined. + + ################################ + # Setters/Getters for processors + ################################ + + def process_failed(self, + processor: Callable[[str, LoadDFResult], None]) -> 'CSVCombiner': + """Set the processor for failed applications.""" + self._failed_app_processor = processor + return self + + def process_success(self, + cb_fn: Callable[[ToolResultHandlerT, str, pd.DataFrame, dict], pd.DataFrame]) -> 'CSVCombiner': + """Set the processor for successful applications.""" + self._success_app_processor = cb_fn + return self + + def combine_args(self, args: dict) -> 'CSVCombiner': + """Set the arguments for combining the reports.""" + self._combine_args = args + return self + + def on_apps(self) -> 'CSVCombiner': + """specify that the combiner should append IDs to the individual results before the concatenation.""" + self.process_success(self.default_success_app_processor) + return self + + ######################### + # Public Interfaces + ######################### + + def build(self) -> LoadDFResult: + """Build the combined CSV report.""" + # process teh arguments to ensure they are set correctly + self._evaluate_args() + + load_error = None + final_df = None + success = False + try: + per_app_res = self.rep_builder.load() + # this is a dictionary and we should loop on it one by one to combine it + combined_dfs = [] + for app_id, app_res in per_app_res.items(): + # we need to patch the app_id to the dataframe + if app_res.load_error or app_res.data.empty: + # process entry with failed results or skip them if no handlder is defined. + if self._failed_app_processor: + self._failed_app_processor(app_id, app_res) + else: + # default behavior is to skip the app + continue + else: + # process entry with successful results + app_df = self._success_app_processor(self.result_handler, + app_id, + app_res.data, + self._combine_args) + # Q: Should we ignore or skip the empty dataframes? + combined_dfs.append(app_df) + final_df = pd.concat(combined_dfs, ignore_index=True) + success = True + except Exception as e: # pylint: disable=broad-except + # handle any exceptions that occur during the combination phase + load_error = e + return LoadDFResult( + f_path='combination of multiple path for table: ' + self.rep_builder.tbl, + data=final_df, + success=success, + fallen_back=False, + load_error=load_error) + + @dataclass class JPropsReport(APIReport[JPropsResult]): """A report that loads data in JSON properties format.""" diff --git a/user_tools/src/spark_rapids_tools/api_v1/core/qual_handler.py b/user_tools/src/spark_rapids_tools/api_v1/core/qual_handler.py index ade9dd072..c4d90ee76 100644 --- a/user_tools/src/spark_rapids_tools/api_v1/core/qual_handler.py +++ b/user_tools/src/spark_rapids_tools/api_v1/core/qual_handler.py @@ -15,10 +15,12 @@ """Module that contains the definition of the qualification Result handler for the core module.""" from dataclasses import dataclass +from typing import Optional from spark_rapids_tools import override from spark_rapids_tools.api_v1 import register_result_class, ResultHandler from spark_rapids_tools.api_v1.report_reader import ToolReportReader +from spark_rapids_tools.storagelib.cspfs import BoundedCspPath @register_result_class('qualCoreOutput') @@ -29,3 +31,6 @@ class QualCoreResultHandler(ResultHandler): @property def alpha_reader(self) -> ToolReportReader: return self.readers.get(self.report_id) + + def get_raw_metrics_path(self) -> Optional[BoundedCspPath]: + return self.get_reader_path('coreRawMetrics') diff --git a/user_tools/src/spark_rapids_tools/api_v1/result_handler.py b/user_tools/src/spark_rapids_tools/api_v1/result_handler.py index 853c10e5d..31ec27c66 100644 --- a/user_tools/src/spark_rapids_tools/api_v1/result_handler.py +++ b/user_tools/src/spark_rapids_tools/api_v1/result_handler.py @@ -132,6 +132,17 @@ def get_reader_by_tbl(self, tbl: str) -> Optional[ToolReportReader]: # Public Interfaces ######################### + def get_reader_path(self, report_id: str) -> Optional[BoundedCspPath]: + """ + Get the path to the report file for the given report ID. + :param report_id: The unique identifier for the report. + :return: The path to the report file, or None if not found. + """ + reader = self.readers.get(report_id) + if reader: + return reader.out_path + return None + def create_empty_df(self, tbl: str) -> pd.DataFrame: """ Create an empty DataFrame for the given table label. @@ -153,6 +164,17 @@ def get_table_path(self, table_label: str) -> Optional[BoundedCspPath]: return reader.get_table_path(table_label) return None + def is_empty(self) -> bool: + """ + Check if the result handler has no data. + :return: True if the result handler is empty, False otherwise. + """ + # first check that the output file exists + if not self.out_path.exists(): + return True + # then check that the app_handlers are empty + return not self.app_handlers + ######################### # Type Definitions ######################### diff --git a/user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py b/user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py index 9bd4618c7..8fb8fbd0f 100644 --- a/user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py +++ b/user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py @@ -460,14 +460,14 @@ def _is_ignore_no_perf(action: str) -> bool: | node_level_supp['Action'].apply(_is_ignore_no_perf) | node_level_supp['Exec Name'] .astype(str) + # TODO: revisit the need to check for 'WholeStageCodegen' in Exec Name. + # Ideally, we want to remove those execs that should be dropped from the analysis ( + # e.g. WholeStageCodegen, WholeStageCodegenExec, etc.) .apply(lambda x: x.startswith('WholeStageCodegen')) ) - node_level_supp = ( - node_level_supp[['App ID', 'SQL ID', 'SQL Node Id', 'Exec Is Supported']] - .groupby(['App ID', 'SQL ID', 'SQL Node Id']) - .agg('all') - .reset_index(level=[0, 1, 2]) - ) + # in previous version we used to group by 'App ID', 'SQL ID', 'SQL Node Id', but this is not + # needed since the 3 keys form an uuid for each row. + node_level_supp = node_level_supp[['App ID', 'SQL ID', 'SQL Node Id', 'Exec Is Supported']] return node_level_supp diff --git a/user_tools/src/spark_rapids_tools/tools/qualx/qualx_main.py b/user_tools/src/spark_rapids_tools/tools/qualx/qualx_main.py index 20e95c4ad..cf339bc57 100644 --- a/user_tools/src/spark_rapids_tools/tools/qualx/qualx_main.py +++ b/user_tools/src/spark_rapids_tools/tools/qualx/qualx_main.py @@ -28,7 +28,9 @@ import fire from spark_rapids_tools import CspPath -from spark_rapids_tools.tools.core.qual_handler import QualCoreHandler +from spark_rapids_tools.api_v1 import QualCoreResultHandler +from spark_rapids_tools.api_v1.builder import CSVReport, CSVCombiner, APIResultHandler + from spark_rapids_tools.tools.qualx.config import ( get_cache_dir, get_config, @@ -152,7 +154,6 @@ def _get_model(platform: str, xgb_model.load_model(model_path) return xgb_model - def _get_calib_params(platform: str, model: Optional[str] = None, variant: Optional[str] = None) -> Dict[str, float]: @@ -193,34 +194,34 @@ def _get_calib_params(platform: str, logger.debug('Calib params file not found: %s', calib_path) return calib_params - -def _get_qual_data(qual_handler: QualCoreHandler) -> Tuple[ - Optional[pd.DataFrame], - Optional[pd.DataFrame], - List[str] -]: - node_level_supp = None - qualtool_output = None - qual_metrics = [] - # Get node level support from combined exec DataFrame - exec_table = qual_handler.get_table_by_label('execCSVReport') - node_level_supp = load_qtool_execs(exec_table) - # Get qualification summary from qualCoreCSVSummary - qualtool_output = qual_handler.get_table_by_label('qualCoreCSVSummary') - if qualtool_output is not None: - # Extract only the required columns - cols = ['App Name', 'App ID', 'App Duration'] - available_cols = [col for col in cols if col in qualtool_output.columns] - if available_cols: - qualtool_output = qualtool_output[available_cols] - # Get path to the raw metrics directory which has the per-app - # raw_metrics files - qual_metrics = qual_handler.get_raw_metrics_paths() - - return node_level_supp, qualtool_output, qual_metrics - - -def _get_combined_qual_data(qual_handlers: List[QualCoreHandler]) -> Tuple[ +def _get_qual_data(qual_handler: QualCoreResultHandler) -> Tuple[Optional[pd.DataFrame], Optional[pd.DataFrame]]: + # extract the ['App Name', App ID', 'App Duration'] columns from the qualification summary. + q_sum_res = (CSVReport(qual_handler) + .table('qualCoreCSVSummary') + .pd_args({'usecols': ['App Name', 'App ID', 'App Duration']}) + .load()) + if not q_sum_res.success: + # That should not happen unless there is something terribly wrong because the caller + # has checked that the result is not empty. Therefore, the summary must exist + raise q_sum_res.load_error + qualtool_summary = q_sum_res.data + # Extracting the execsCSVReport should return a dictionary of results. + # Combine that dictionary into a single DF. + # The following creates a combiner that: + # 1- uses the default combination method which is dedicated to use the appHanlder to combine the results. + # 2- ignore the failed execs. i.e., apps that has no execs. Otherwise, it is possible to add a call-back to handle + # the apps that had no execs. + execs_combiner = CSVCombiner( + rep_builder=CSVReport(qual_handler).table('execCSVReport') # create the csvLoader + ).combine_args({'col_names': ['App ID']}) # tells the combiner to use those column names to insert the App uuid + q_execs_res = execs_combiner.build() + if not q_execs_res.success: + raise q_execs_res.load_error + node_level_supp = load_qtool_execs(q_execs_res.data) + + return node_level_supp, qualtool_summary + +def _get_combined_qual_data(qual_handlers: List[QualCoreResultHandler]) -> Tuple[ Optional[pd.DataFrame], Optional[pd.DataFrame], List[str] @@ -236,15 +237,22 @@ def _get_combined_qual_data(qual_handlers: List[QualCoreHandler]) -> Tuple[ combined_qual_metrics = [] for handler in qual_handlers: - node_level_supp, qualtool_output, qual_metrics = _get_qual_data(handler) + if handler.is_empty(): + # skip the handler that has no apps + continue + node_level_supp, qualtool_output = _get_qual_data(handler) if node_level_supp is not None: combined_node_level_supp.append(node_level_supp) if qualtool_output is not None: combined_qualtool_output.append(qualtool_output) - - combined_qual_metrics.extend(qual_metrics) + # Add the raw_metrics folder if it is not None + raw_metrics_path = handler.get_raw_metrics_path() + if raw_metrics_path: + # For now, append the path without the scheme to be compatible with the remaining code + # that does not expect a URI value. + combined_qual_metrics.append(raw_metrics_path.no_scheme) # Combine DataFrames final_node_level_supp = None @@ -701,7 +709,7 @@ def predict( model: Optional[str] = None, qual_tool_filter: Optional[str] = None, config: Optional[str] = None, - qual_handlers: List[QualCoreHandler] + qual_handlers: List[QualCoreResultHandler] ) -> pd.DataFrame: """Predict GPU speedup given CPU logs. @@ -720,7 +728,7 @@ def predict( config: Path to a qualx-conf.yaml file to use for configuration. qual_handlers: - List of QualCoreHandler instances for reading qualification data. + List of QualCoreResultHandler instances for reading qualification data. """ # load config from command line argument, or use default cfg = get_config(config) @@ -923,7 +931,8 @@ def _predict_cli( qual = output_dir else: qual = qual_output - qual_handlers.append(QualCoreHandler(result_path=qual_output)) + q_core_handler = APIResultHandler().qual_core().with_path(qual_output).build() + qual_handlers.append(q_core_handler) output_info = { 'perSql': {'path': os.path.join(output_dir, 'per_sql.csv')}, diff --git a/user_tools/src/spark_rapids_tools/tools/qualx/util.py b/user_tools/src/spark_rapids_tools/tools/qualx/util.py index 952544b38..0072a9a70 100644 --- a/user_tools/src/spark_rapids_tools/tools/qualx/util.py +++ b/user_tools/src/spark_rapids_tools/tools/qualx/util.py @@ -31,8 +31,9 @@ import pandas as pd from tabulate import tabulate +from spark_rapids_tools.api_v1 import QualCoreResultHandler +from spark_rapids_tools.api_v1.builder import APIResultHandler from spark_rapids_tools.cmdli.dev_cli import DevCLI -from spark_rapids_tools.tools.core.qual_handler import QualCoreHandler from spark_rapids_tools.tools.qualx.config import get_config, get_label @@ -366,8 +367,11 @@ def run_profiling_core(eventlog: str) -> None: run_commands(eventlogs, run_profiling_core) -def run_qualification_tool(platform: str, eventlogs: List[str], - output_dir: str, skip_run: bool = False, tools_config: str = None) -> List[QualCoreHandler]: +def run_qualification_tool(platform: str, + eventlogs: List[str], + output_dir: str, + skip_run: bool = False, + tools_config: str = None) -> List[QualCoreResultHandler]: logger.info('Running qualification on: %s', eventlogs if len(eventlogs) < 5 else f'{len(eventlogs)} eventlogs') logger.info('Saving output to: %s', output_dir) @@ -396,7 +400,7 @@ def run_qualification_core(eventlog: str) -> None: qual_handlers = [] for output_path in output_dirs: try: - handler = QualCoreHandler(result_path=output_path) + handler = APIResultHandler().qual_core().with_path(output_path).build() qual_handlers.append(handler) except Exception as e: # pylint: disable=broad-except logger.warning('Failed to create QualCoreHandler for %s: %s', output_path, e) From 3d818be322c932d0601c08f3c27c182be5c77312 Mon Sep 17 00:00:00 2001 From: "Ahmed Hussein (amahussein)" Date: Mon, 4 Aug 2025 15:52:46 -0500 Subject: [PATCH 2/4] fix flake8 Signed-off-by: Ahmed Hussein (amahussein) --- user_tools/src/spark_rapids_tools/api_v1/builder.py | 2 +- .../spark_rapids_tools/tools/qualx/qualx_main.py | 13 +++++++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/user_tools/src/spark_rapids_tools/api_v1/builder.py b/user_tools/src/spark_rapids_tools/api_v1/builder.py index 59768b7ba..563f57cc4 100644 --- a/user_tools/src/spark_rapids_tools/api_v1/builder.py +++ b/user_tools/src/spark_rapids_tools/api_v1/builder.py @@ -273,7 +273,7 @@ def combine_args(self, args: dict) -> 'CSVCombiner': return self def on_apps(self) -> 'CSVCombiner': - """specify that the combiner should append IDs to the individual results before the concatenation.""" + """specify that the combiner inject AP UUID to the individual results before the concatenation.""" self.process_success(self.default_success_app_processor) return self diff --git a/user_tools/src/spark_rapids_tools/tools/qualx/qualx_main.py b/user_tools/src/spark_rapids_tools/tools/qualx/qualx_main.py index cf339bc57..0deaa18d0 100644 --- a/user_tools/src/spark_rapids_tools/tools/qualx/qualx_main.py +++ b/user_tools/src/spark_rapids_tools/tools/qualx/qualx_main.py @@ -154,6 +154,7 @@ def _get_model(platform: str, xgb_model.load_model(model_path) return xgb_model + def _get_calib_params(platform: str, model: Optional[str] = None, variant: Optional[str] = None) -> Dict[str, float]: @@ -194,6 +195,7 @@ def _get_calib_params(platform: str, logger.debug('Calib params file not found: %s', calib_path) return calib_params + def _get_qual_data(qual_handler: QualCoreResultHandler) -> Tuple[Optional[pd.DataFrame], Optional[pd.DataFrame]]: # extract the ['App Name', App ID', 'App Duration'] columns from the qualification summary. q_sum_res = (CSVReport(qual_handler) @@ -211,16 +213,19 @@ def _get_qual_data(qual_handler: QualCoreResultHandler) -> Tuple[Optional[pd.Dat # 1- uses the default combination method which is dedicated to use the appHanlder to combine the results. # 2- ignore the failed execs. i.e., apps that has no execs. Otherwise, it is possible to add a call-back to handle # the apps that had no execs. - execs_combiner = CSVCombiner( - rep_builder=CSVReport(qual_handler).table('execCSVReport') # create the csvLoader - ).combine_args({'col_names': ['App ID']}) # tells the combiner to use those column names to insert the App uuid - q_execs_res = execs_combiner.build() + q_execs_res = ( + CSVCombiner(rep_builder=CSVReport(qual_handler).table('execCSVReport')) # use ExecsCSV report + .on_apps() # combine DFs on apps + .combine_args({'col_names': ['App ID']}) # inject cols when combining on apps + .build() # combine the results + ) if not q_execs_res.success: raise q_execs_res.load_error node_level_supp = load_qtool_execs(q_execs_res.data) return node_level_supp, qualtool_summary + def _get_combined_qual_data(qual_handlers: List[QualCoreResultHandler]) -> Tuple[ Optional[pd.DataFrame], Optional[pd.DataFrame], From 2a1b7490a15d6f6d667d3e28cb09191131b6f6ca Mon Sep 17 00:00:00 2001 From: "Ahmed Hussein (amahussein)" Date: Tue, 5 Aug 2025 11:20:01 -0500 Subject: [PATCH 3/4] handle special cases with concatenation Signed-off-by: Ahmed Hussein (amahussein) --- .../spark_rapids_tools/api_v1/app_handler.py | 5 +++++ .../src/spark_rapids_tools/api_v1/builder.py | 19 +++++++++++++++++- .../tools/qualx/qualx_main.py | 20 +++++++++---------- 3 files changed, 33 insertions(+), 11 deletions(-) diff --git a/user_tools/src/spark_rapids_tools/api_v1/app_handler.py b/user_tools/src/spark_rapids_tools/api_v1/app_handler.py index 7e12412e5..02ffbf074 100644 --- a/user_tools/src/spark_rapids_tools/api_v1/app_handler.py +++ b/user_tools/src/spark_rapids_tools/api_v1/app_handler.py @@ -75,6 +75,11 @@ def patch_into_df(self, if col_names is None: # append attemptId to support multi-attempts col_names = ['appId'] + # Ensure col_values matches col_names in length + if len(col_values) == 1 and len(col_names) > 1: + col_values = col_values * len(col_names) + elif len(col_values) != len(col_names): + raise ValueError('Length of col_values must be 1 or match length of col_names') if not df.empty: for col_k, col_v in zip(reversed(col_names), reversed(col_values)): if col_k not in df.columns: diff --git a/user_tools/src/spark_rapids_tools/api_v1/builder.py b/user_tools/src/spark_rapids_tools/api_v1/builder.py index 563f57cc4..7740845d9 100644 --- a/user_tools/src/spark_rapids_tools/api_v1/builder.py +++ b/user_tools/src/spark_rapids_tools/api_v1/builder.py @@ -251,6 +251,18 @@ def _evaluate_args(self) -> None: self._success_app_processor = self.default_success_app_processor # TODO: we should fail if the the combiner is built for AppIds but columns are not defined. + def _create_empty_df(self) -> pd.DataFrame: + """ + creates an empty DataFrame with the columns defined in the report builder. + :return: an empty dataframe. + """ + empty_df = self.result_handler.create_empty_df(self.rep_builder.tbl) + if self._combine_args and 'use_cols' in self._combine_args: + # make sure that we insert the columns to the empty dataframe + injected_cols = pd.DataFrame(columns=self._combine_args['use_cols']) + return pd.concat([injected_cols, empty_df], axis=1) + return empty_df + ################################ # Setters/Getters for processors ################################ @@ -310,7 +322,12 @@ def build(self) -> LoadDFResult: self._combine_args) # Q: Should we ignore or skip the empty dataframes? combined_dfs.append(app_df) - final_df = pd.concat(combined_dfs, ignore_index=True) + if combined_dfs: + # only concatenate if we have any dataframes to combine + final_df = pd.concat(combined_dfs, ignore_index=True) + else: + # create an empty DataFrame if no data was collected. uses the table schema. + final_df = self._create_empty_df() success = True except Exception as e: # pylint: disable=broad-except # handle any exceptions that occur during the combination phase diff --git a/user_tools/src/spark_rapids_tools/tools/qualx/qualx_main.py b/user_tools/src/spark_rapids_tools/tools/qualx/qualx_main.py index 0deaa18d0..6f192892f 100644 --- a/user_tools/src/spark_rapids_tools/tools/qualx/qualx_main.py +++ b/user_tools/src/spark_rapids_tools/tools/qualx/qualx_main.py @@ -205,7 +205,7 @@ def _get_qual_data(qual_handler: QualCoreResultHandler) -> Tuple[Optional[pd.Dat if not q_sum_res.success: # That should not happen unless there is something terribly wrong because the caller # has checked that the result is not empty. Therefore, the summary must exist - raise q_sum_res.load_error + raise RuntimeError(f'Failed to load qualCoreCSVSummary {q_sum_res.load_error}') from q_sum_res.load_error qualtool_summary = q_sum_res.data # Extracting the execsCSVReport should return a dictionary of results. # Combine that dictionary into a single DF. @@ -220,7 +220,7 @@ def _get_qual_data(qual_handler: QualCoreResultHandler) -> Tuple[Optional[pd.Dat .build() # combine the results ) if not q_execs_res.success: - raise q_execs_res.load_error + raise RuntimeError(f'Failed to load execution CSV report: {q_execs_res.load_error}') from q_execs_res.load_error node_level_supp = load_qtool_execs(q_execs_res.data) return node_level_supp, qualtool_summary @@ -238,20 +238,20 @@ def _get_combined_qual_data(qual_handlers: List[QualCoreResultHandler]) -> Tuple return None, None, [] combined_node_level_supp = [] - combined_qualtool_output = [] + combined_q_core_sum = [] # summary of the Qualification core (Id, Name, Duration) combined_qual_metrics = [] for handler in qual_handlers: if handler.is_empty(): # skip the handler that has no apps continue - node_level_supp, qualtool_output = _get_qual_data(handler) + node_level_supp, q_core_summary = _get_qual_data(handler) if node_level_supp is not None: combined_node_level_supp.append(node_level_supp) - if qualtool_output is not None: - combined_qualtool_output.append(qualtool_output) + if q_core_summary is not None: + combined_q_core_sum.append(q_core_summary) # Add the raw_metrics folder if it is not None raw_metrics_path = handler.get_raw_metrics_path() if raw_metrics_path: @@ -264,11 +264,11 @@ def _get_combined_qual_data(qual_handlers: List[QualCoreResultHandler]) -> Tuple if combined_node_level_supp: final_node_level_supp = pd.concat(combined_node_level_supp, ignore_index=True) - final_qualtool_output = None - if combined_qualtool_output: - final_qualtool_output = pd.concat(combined_qualtool_output, ignore_index=True) + final_q_core_sum = None + if combined_q_core_sum: + final_q_core_sum = pd.concat(combined_q_core_sum, ignore_index=True) - return final_node_level_supp, final_qualtool_output, combined_qual_metrics + return final_node_level_supp, final_q_core_sum, combined_qual_metrics def _get_split_fn(split_fn: Union[str, dict]) -> Callable[[pd.DataFrame], pd.DataFrame]: From 59714c7cd91afba9929fa6c073156198f14df036 Mon Sep 17 00:00:00 2001 From: "Ahmed Hussein (amahussein)" Date: Tue, 12 Aug 2025 12:04:32 -0500 Subject: [PATCH 4/4] Improve the CSVCombiner API and add shadow implementation for QualX Signed-off-by: Ahmed Hussein (amahussein) --- .../rapids/qualification.py | 16 +- .../src/spark_rapids_tools/api_v1/__init__.py | 12 +- .../spark_rapids_tools/api_v1/app_handler.py | 126 +++- .../src/spark_rapids_tools/api_v1/builder.py | 501 +++++++++++--- .../tools/qualx/featurizers/default.py | 14 +- .../tools/qualx/preprocess.py | 11 + .../tools/qualx/qualx_main.py | 30 +- .../tools/qualx/revamp/__init__.py | 15 + .../tools/qualx/revamp/featurizer.py | 655 ++++++++++++++++++ .../tools/qualx/revamp/preprocess.py | 81 +++ .../tools/qualx/revamp/x_main.py | 74 ++ .../spark_rapids_tools/utils/data_utils.py | 22 +- .../src/spark_rapids_tools/utils/util.py | 15 + .../spark_rapids_tools_ut/api/__init__.py | 15 + .../api/test_app_handlers.py | 91 +++ 15 files changed, 1560 insertions(+), 118 deletions(-) create mode 100644 user_tools/src/spark_rapids_tools/tools/qualx/revamp/__init__.py create mode 100644 user_tools/src/spark_rapids_tools/tools/qualx/revamp/featurizer.py create mode 100644 user_tools/src/spark_rapids_tools/tools/qualx/revamp/preprocess.py create mode 100644 user_tools/src/spark_rapids_tools/tools/qualx/revamp/x_main.py create mode 100644 user_tools/tests/spark_rapids_tools_ut/api/__init__.py create mode 100644 user_tools/tests/spark_rapids_tools_ut/api/test_app_handlers.py diff --git a/user_tools/src/spark_rapids_pytools/rapids/qualification.py b/user_tools/src/spark_rapids_pytools/rapids/qualification.py index bb3e7a9f0..78d910f62 100644 --- a/user_tools/src/spark_rapids_pytools/rapids/qualification.py +++ b/user_tools/src/spark_rapids_pytools/rapids/qualification.py @@ -36,6 +36,7 @@ from spark_rapids_tools.tools.core.qual_handler import QualCoreHandler from spark_rapids_tools.tools.qualx.qualx_main import predict from spark_rapids_tools.tools.qualification_stats_report import SparkQualificationStats +from spark_rapids_tools.tools.qualx.revamp.x_main import predict_x from spark_rapids_tools.tools.speedup_category import SpeedupCategory from spark_rapids_tools.tools.top_candidates import TopCandidates from spark_rapids_tools.tools.unsupported_ops_stage_duration import UnsupportedOpsStageDuration @@ -541,10 +542,17 @@ def __update_apps_with_prediction_info(self, try: # Build the QualCore handler object to handle the prediction model output q_core_handler = APIResultHandler().qual_core().with_path(qual_output_dir).build() - predictions_df = predict(platform=model_name, qual=qual_output_dir, - output_info=output_info, - model=estimation_model_args['customModelFile'], - qual_handlers=[q_core_handler]) + if Utils.get_rapids_tools_env('QUALX_REVAMP'): + predictions_df = predict_x(platform=model_name, + qual=qual_output_dir, + output_info=output_info, + model=estimation_model_args['customModelFile'], + qual_handlers=[q_core_handler]) + else: + predictions_df = predict(platform=model_name, qual=qual_output_dir, + output_info=output_info, + model=estimation_model_args['customModelFile'], + qual_handlers=[q_core_handler]) except Exception as e: # pylint: disable=broad-except predictions_df = pd.DataFrame() self.logger.error( diff --git a/user_tools/src/spark_rapids_tools/api_v1/__init__.py b/user_tools/src/spark_rapids_tools/api_v1/__init__.py index 515181b81..a0021494a 100644 --- a/user_tools/src/spark_rapids_tools/api_v1/__init__.py +++ b/user_tools/src/spark_rapids_tools/api_v1/__init__.py @@ -32,6 +32,12 @@ QualCoreResultHandler, ProfCoreResultHandler ) +from .builder import ( + LoadRawFilesResult, + APIUtils, + CSVReportCombiner, + CSVReport +) __all__ = [ 'AppHandler', @@ -42,5 +48,9 @@ 'QualWrapperResultHandler', 'QualCoreResultHandler', 'ProfWrapperResultHandler', - 'ProfCoreResultHandler' + 'ProfCoreResultHandler', + 'LoadRawFilesResult', + 'APIUtils', + 'CSVReportCombiner', + 'CSVReport' ] diff --git a/user_tools/src/spark_rapids_tools/api_v1/app_handler.py b/user_tools/src/spark_rapids_tools/api_v1/app_handler.py index 02ffbf074..895829dee 100644 --- a/user_tools/src/spark_rapids_tools/api_v1/app_handler.py +++ b/user_tools/src/spark_rapids_tools/api_v1/app_handler.py @@ -13,12 +13,15 @@ # limitations under the License. """module that defines the app descriptor for the results loaded by the tools.""" - +import re from dataclasses import dataclass, field from functools import cached_property -from typing import Optional, List +from typing import Optional, List, Dict import pandas as pd +from pydantic.alias_generators import to_camel + +from spark_rapids_tools.utils import Utilities @dataclass @@ -32,6 +35,56 @@ class AppHandler(object): # this will be loaded from the core-status csv report eventlog_path: Optional[str] = None + @staticmethod + def get_pd_dtypes() -> Dict[str, str]: + """ + Get the pandas data types for the AppHandler attributes. + :return: Dictionary mapping attribute names to pandas data types. + """ + return { + 'app_id': Utilities.scala_to_pandas_type('String'), + 'attempt_id': Utilities.scala_to_pandas_type('Int'), + 'app_name': Utilities.scala_to_pandas_type('String'), + 'eventlog_path': Utilities.scala_to_pandas_type('String') + } + + @staticmethod + def normalize_attribute(arg_value: str) -> str: + """ + Normalize the attribute name to a plain format. + It uses re.sub to replace any '-' or '_' with a space using the regexp 'r"(_|-)+"'. + Finally, it uses str.replace() to remove any spaces. + :param arg_value: the attribute name to normalize. + :return: the actual field name that is used in the AppHandler. + """ + processed_value = re.sub(r'([_\-])+', ' ', arg_value.strip().lower()).replace(' ', '') + lookup_map = { + 'appname': 'app_name', + 'appid': 'app_id', + 'attemptid': 'attempt_id', + 'eventlogpath': 'eventlog_path' + } + return lookup_map.get(processed_value, arg_value) + + @classmethod + def get_key_attributes(cls) -> List[str]: + """ + Get the key attributes that define an AppHandler. + :return: List of key attributes. + """ + return ['app_id'] + + @classmethod + def get_default_key_columns(cls) -> Dict[str, str]: + """ + Get the default key columns for the AppHandler. + :return: Dictionary mapping attribute names to column names. + """ + res = {} + for attr in cls.get_key_attributes(): + res[attr] = to_camel(attr) + return res + def is_name_defined(self) -> bool: """ Check if the app name is defined. @@ -141,3 +194,72 @@ def merge(self, other: 'AppHandler') -> 'AppHandler': if self.eventlog_path is None and other.eventlog_path is not None: self.eventlog_path = other.eventlog_path return self + + ################################ + # Public Methods + ################################ + + def convert_to_df(self) -> pd.DataFrame: + """ + Convert the AppHandler attributes to a DataFrame. + :return: DataFrame with app_id, app_name, and attempt_id as columns. + """ + data = { + 'app_id': [self.app_id], + 'attempt_id': [self.attempt_id], + 'app_name': [self.app_name], + 'eventlog_path': [self.eventlog_path] + } + data_types = AppHandler.get_pd_dtypes() + return pd.DataFrame({ + col: pd.Series(data[col], dtype=dtype) for col, dtype in data_types.items() + }) + + def add_fields_to_dataframe(self, + df: pd.DataFrame, + field_to_col_map: Dict[str, str]) -> pd.DataFrame: + """ + Insert fields/properties from AppHandler into the DataFrame, with user-specified column names. + :param df: Existing DataFrame to append to. + :type df: pd.DataFrame + :param field_to_col_map: Dictionary mapping AppHandler attributes (keys) to DataFrame column names (values). + :type field_to_col_map: Dict[str, str] + default: Value to use if attribute/property not found (raises if None). + """ + converted_df = self.convert_to_df() + row_data = [] + for attr, col in field_to_col_map.items(): + # Normalize the attribute name + norm_attr = AppHandler.normalize_attribute(attr) + try: + value = getattr(self, norm_attr) + row_data.append((col, norm_attr, value)) + except AttributeError as exc: + raise AttributeError(f"Attribute '{attr}' not found in AppHandler.") from exc + for col, norm_attr, value in reversed(row_data): + # Check if the column already exists in the DataFrame + if col in df.columns: + # If it exists, we should not overwrite it, skip + continue + # create a new column with the correct type. We do this because we do not want to + # to add values to an empty dataframe. + df.insert(loc=0, column=col, value=pd.Series(dtype=converted_df[norm_attr].dtype)) + # set the values in case the dataframe was non-empty. + df[col] = pd.Series([value] * len(df), dtype=converted_df[norm_attr].dtype) + return df + + @classmethod + def inject_into_df(cls, + df: pd.DataFrame, + field_to_col_map: Dict[str, str], + app_h: Optional['AppHandler'] = None) -> pd.DataFrame: + """ + Inject AppHandler fields into a DataFrame using a mapping of field names to column names. + :param df: + :param field_to_col_map: + :param app_h: + :return: + """ + if app_h is None: + app_h = AppHandler(_app_id='UNKNOWN_APP', _app_name='UNKNOWN_APP', _attempt_id=1) + return app_h.add_fields_to_dataframe(df, field_to_col_map) diff --git a/user_tools/src/spark_rapids_tools/api_v1/builder.py b/user_tools/src/spark_rapids_tools/api_v1/builder.py index 7740845d9..1409fede7 100644 --- a/user_tools/src/spark_rapids_tools/api_v1/builder.py +++ b/user_tools/src/spark_rapids_tools/api_v1/builder.py @@ -15,7 +15,8 @@ """Module that contains the entry for the API v1 builder.""" from dataclasses import dataclass, field -from typing import Union, Optional, TypeVar, Generic, List, Dict, Callable +from logging import Logger +from typing import Union, Optional, TypeVar, Generic, List, Dict, Callable, Any, Set, Tuple import pandas as pd @@ -25,7 +26,7 @@ from spark_rapids_tools.api_v1.report_loader import ReportLoader from spark_rapids_tools.api_v1.report_reader import ToolReportReaderT from spark_rapids_tools.storagelib.cspfs import BoundedCspPath -from spark_rapids_tools.utils.data_utils import LoadDFResult, JPropsResult, TXTResult +from spark_rapids_tools.utils.data_utils import LoadDFResult, JPropsResult, TXTResult, DataUtils @dataclass @@ -103,6 +104,22 @@ def is_per_app_tbl(self) -> bool: return False return self.rep_reader.is_per_app + @property + def app_handlers(self) -> List[AppHandler]: + """ + Return the list of application handlers associated with this report. + + If no specific apps are set, returns all app handlers from the handler. + Otherwise, returns handlers for the specified apps. + """ + if not self._apps: + return list(self.handler.app_handlers.values()) + app_handlers = [ + self.handler.app_handlers.get(app_e) if isinstance(app_e, str) else app_e + for app_e in self._apps + ] + return [app_h for app_h in app_handlers if app_h is not None] + def _check_apps(self) -> None: """Check if applications are properly configured.""" if not self.is_per_app_tbl: @@ -164,6 +181,12 @@ class CSVReport(APIReport[LoadDFResult]): _fall_cb: Optional[Callable[[], pd.DataFrame]] = field(default=None, init=False) _map_cols: Optional[dict] = field(default=None, init=False) _pd_args: Optional[dict] = field(default=None, init=False) + _col_mapper_cb: Optional[Callable[[List[str]], Dict[str, str]]] = field(default=None, init=False) + + def map_cols_cb(self, cb: Callable[[List[str]], Dict[str, str]]) -> 'CSVReport': + """Set the callback for mapping columns when loading data.""" + self._col_mapper_cb = cb + return self def fall_cb(self, cb: Callable[[], pd.DataFrame]) -> 'CSVReport': """Set the fallback callback for loading data.""" @@ -180,6 +203,39 @@ def pd_args(self, args: dict) -> 'CSVReport': self._pd_args = args return self + def _check_map_col_args(self) -> None: + """ + build the final column mapping of the report + """ + temp_map_cols = {} + if self._col_mapper_cb: + # if the column mapper callback is defined, we need to build the static column. + actual_tbl_cols = [col.name for col in self.rep_reader.get_table(self.tbl).columns] + # check if the use_cols is defined + if self._pd_args and 'usecols' in self._pd_args: + # only pick those columns if defined + selected_cols = self._pd_args['usecols'] + else: + # if usecols is not defined, we will use all columns + selected_cols = actual_tbl_cols + # apply the map callback to the columns of the table + temp_map_cols = self._col_mapper_cb(selected_cols) + if self._map_cols is not None: + # if the static map_cols is defined, then it is possible that we want to apply a + # dynamic method to rename the columns, but for some reason there is a couple of fields + # mapped statically. + # overwrite the temp_map_cols keys with the static map_cols, or insert new ones. + temp_map_cols.update(self._map_cols) + if temp_map_cols: + # finally re-assign the _map_cols field with the final column-mapping. + self.map_cols(temp_map_cols) + + @override + def _check_args(self) -> None: + """Check if the required arguments are set.""" + super()._check_args() + self._check_map_col_args() + @override def _load_global(self) -> LoadDFResult: return self.rep_reader.load_df( @@ -210,133 +266,263 @@ def _load_single_app(self) -> LoadDFResult: pd_args=self._pd_args ) + def create_empty_df(self) -> pd.DataFrame: + """ + Create an empty DataFrame with the columns defined in the report. + :return: An empty DataFrame with the columns defined in the report. + """ + tbl_df = self.handler.create_empty_df(self.tbl) + # we need to handle if the columns were renamed, or selected + if self._pd_args and 'usecols' in self._pd_args: + # if usecols is defined, we need to filter the columns + use_cols = self._pd_args['usecols'] + tbl_df = tbl_df[use_cols] + if self._map_cols: + tbl_df = tbl_df.rename(columns=self._map_cols) + return tbl_df + @dataclass -class CSVCombiner(object): - """A class that combines multiple CSV reports into a single report.""" - rep_builder: CSVReport - _failed_app_processor: Optional[Callable[[str, LoadDFResult], None]] = field(default=None, init=False) - _success_app_processor: Optional[Callable[[ToolResultHandlerT, str, pd.DataFrame, dict], pd.DataFrame]] = ( +class CSVReportCombiner(object): + """ + A class for combining multiple CSVReport instances into a single DataFrame. This implementation + assumes that: + 1. It must be called on a Per-App report (aka, a report that has per app-data). + 2. All CSVReport instances are associated with the same Table. + 3. The reports can be loaded independently and combined based on application IDs. + 4. The reports may have different sets of applications + + This class manages the process of loading, combining, and post-processing data from + multiple report builders, handling both successful and failed application loads. + It supports injecting application IDs into the resulting DataFrame, custom callbacks + for success and failure cases, and fallback to empty DataFrames if needed. + + :param: rep_builders: List[CSVReport]. A list of CSVReport instances to combine. + :param: _inject_app_ids_enabled: Bool, default True. Flag to enable/disable injection of + application IDs into the combined DataFrame. + :param: _app_fields: Optional[Dict[str, str]], default None. A dictionary that specifies the + fields of the AppHandlers and how to inject them into the per-app DataFrame before they + get combined. + The expected structure is [field_name: str, column_name: str] where the normalized + value of field_name is a valid field in AppHandler instance. + Example of acceptable keys for AppHandler.app_id are: 'app_id', 'App ID', 'appId', 'app ID'. + :param: _success_cb: Optional[Callable[[ToolResultHandlerT, str, pd.DataFrame], pd.DataFrame]], + default None. + A callback function that is called to provide any extra custom processing for each + successful application. This cb is applied after injecting the app columns and right-before + concatenating the app-dataframe to the combined dataFrame. + This is useful in case the caller wants to apply some dataframe operations on the DataFrame. + Note, it might be tricky if this cb is changing the shape of the DataFrame, as it might + conflict with the expected Dataframe DTypes extracted from the original Table. + :param: _failure_cb: Optional[Callable[[ToolResultHandlerT, str, LoadDFResult], Any]], default None. + Provides custom handling for failed application loads. If the caller needs more processing + than just appending the failures into the failed_apps dictionary, this callback can become + handy. + """ + rep_builders: List[CSVReport] + _inject_app_ids_enabled: bool = field(default=True, init=False) + _app_fields: Optional[Dict[str, str]] = field(default=None, init=False) + _success_cb: Optional[Callable[[ToolResultHandlerT, str, pd.DataFrame], pd.DataFrame]] = ( field(default=None, init=False)) - _combine_args: Optional[dict] = field(default=None, init=False) + _failure_cb: Optional[Callable[[ToolResultHandlerT, str, LoadDFResult], Any]] = ( + field(default=None, init=False)) + _fall_back_to_empty_df: bool = field(default=False, init=False) + _failed_apps: Dict[str, Exception] = field(default_factory=dict, init=False) + _successful_apps: Set[str] = field(default_factory=set, init=False) @property - def result_handler(self) -> ToolResultHandlerT: - """Get the result handler associated with this combiner.""" - return self.rep_builder.handler + def failed_apps(self) -> Dict[str, Exception]: + """Get the dictionary of failed applications.""" + return self._failed_apps - @staticmethod - def default_success_app_processor(result_handler: ToolResultHandlerT, - app_id: str, - df: pd.DataFrame, - combine_args: dict) -> pd.DataFrame: - """Default processor for successful applications.""" - col_names = None - app_entry = result_handler.app_handlers.get(app_id) - if not app_entry: - raise ValueError(f'App entry not found for ID: {app_id}') - if combine_args: - # check if the col_names are provided to stitch the app_ids - col_names = combine_args.get('col_names', None) - if col_names: - # patch the app_uuid and if the columns are defined. - return app_entry.patch_into_df(df, col_names=col_names) - return df - - def _evaluate_args(self) -> None: - """Evaluate the arguments to ensure they are set correctly.""" - - if self._success_app_processor is None: - # set the default processor for successful applications - self._success_app_processor = self.default_success_app_processor - # TODO: we should fail if the the combiner is built for AppIds but columns are not defined. + @property + def default_rep_builder(self) -> CSVReport: + """Get the first report builder.""" + if not self.rep_builders: + raise ValueError('No report builders provided for combination.') + return self.rep_builders[0] - def _create_empty_df(self) -> pd.DataFrame: - """ - creates an empty DataFrame with the columns defined in the report builder. - :return: an empty dataframe. + @property + def tbl(self) -> str: + """Get the table label for the first report builder.""" + return self.default_rep_builder.tbl + + @property + def app_handlers(self) -> List[AppHandler]: """ - empty_df = self.result_handler.create_empty_df(self.rep_builder.tbl) - if self._combine_args and 'use_cols' in self._combine_args: - # make sure that we insert the columns to the empty dataframe - injected_cols = pd.DataFrame(columns=self._combine_args['use_cols']) - return pd.concat([injected_cols, empty_df], axis=1) - return empty_df + Return the list of application handlers associated with the report builder. + If no specific apps are set, returns all app handlers from the handler. + Otherwise, returns handlers for the specified apps. - ################################ - # Setters/Getters for processors - ################################ + Note that, it is possible to have duplicate apps if the same app is present in multiple reports. + """ + res = [] + for rep_b in self.rep_builders: + res.extend(rep_b.app_handlers) + return res + + def disable_apps_injection(self) -> 'CSVReportCombiner': + """Disable the injection of application IDs into the combined DataFrame.""" + self._inject_app_ids_enabled = False + return self - def process_failed(self, - processor: Callable[[str, LoadDFResult], None]) -> 'CSVCombiner': - """Set the processor for failed applications.""" - self._failed_app_processor = processor + def on_app_fields(self, combine_on: Dict[str, str]) -> 'CSVReportCombiner': + """Set the columns to combine on.""" + self._app_fields = combine_on return self - def process_success(self, - cb_fn: Callable[[ToolResultHandlerT, str, pd.DataFrame, dict], pd.DataFrame]) -> 'CSVCombiner': - """Set the processor for successful applications.""" - self._success_app_processor = cb_fn + def entry_success_cb(self, + cb: Callable[[ToolResultHandlerT, str, pd.DataFrame], pd.DataFrame]) -> 'CSVReportCombiner': + """Set the callback for successful applications.""" + self._success_cb = cb return self - def combine_args(self, args: dict) -> 'CSVCombiner': - """Set the arguments for combining the reports.""" - self._combine_args = args + def entry_failure_cb(self, cb: Callable[[ToolResultHandlerT, str, LoadDFResult], Any]) -> 'CSVReportCombiner': + """Set the callback for failed applications""" + self._failure_cb = cb return self - def on_apps(self) -> 'CSVCombiner': - """specify that the combiner inject AP UUID to the individual results before the concatenation.""" - self.process_success(self.default_success_app_processor) + def empty_df_on_error(self, fall_to_empty: bool = True) -> 'CSVReportCombiner': + """Set whether to fall back to an empty DataFrame if no data is available.""" + self._fall_back_to_empty_df = fall_to_empty return self - ######################### - # Public Interfaces - ######################### + @staticmethod + def logger(csv_rep: CSVReport) -> Logger: + return csv_rep.handler.logger + + @staticmethod + def log_failed_app(app_id: str, csv_rep: CSVReport, failed_load: LoadDFResult) -> None: + """Log a failed application.""" + CSVReportCombiner.logger(csv_rep).debug( + f'Failed to load {csv_rep.tbl} for app {app_id}: {failed_load.get_fail_cause()}') + + def _process_args(self) -> None: + """Process the arguments to ensure they are set correctly.""" + if not self.rep_builders: + raise ValueError('No report builders provided for combination.') + if self._app_fields is None: + # by default, we will use app_id column and it will be inserted as 'appId' + # Later we can add more fields + self.on_app_fields(AppHandler.get_default_key_columns()) + + def _inject_app_into_df( + self, + res_h: ToolResultHandlerT, + df: pd.DataFrame, app_id: str) -> pd.DataFrame: + """ + Inject the application ID into the DataFrame. + :param df: The DataFrame to inject the application ID into. + :param app_id: The application ID to inject. + :return: The DataFrame with the application ID injected. + """ + # TODO: Should we check if app_obj is not found? + app_obj = res_h.app_handlers.get(app_id) + return app_obj.add_fields_to_dataframe(df, self._app_fields) + + def _build_single_report(self, csv_rep: CSVReport) -> List[pd.DataFrame]: + combined_dfs = [] + # this is a dictionary and we should loop on it one by one to combine it + per_app_res = csv_rep.load() + for app_id, app_res in per_app_res.items(): + try: + # Set a generic try-except block to handle unexpected errors for each entry to avoid + # failing the entire combination process. + if not app_res.success: # what is the correct way to check for success? + # Process entry with failed results. + # 1. log debug message (We do not want error message because it will confuse the users) + # 2. Add it to the dictionary of failed apps + # 3. Call the failure callback if defined. + CSVReportCombiner.log_failed_app(app_id, csv_rep, app_res) + self._failed_apps[app_id] = app_res.get_fail_cause() + if self._failure_cb: + try: + self._failure_cb(csv_rep.handler, app_id, app_res) + except Exception as failure_cb_ex: # pylint: disable=broad-except + # if the failure callback fails, we log it but do not raise an error. + CSVReportCombiner.logger(csv_rep).error( + f'Failed to apply failure_cb for app {app_id} on {csv_rep.tbl}: {failure_cb_ex}') + else: + # This is a successful result, we need to process it. + # 1. Append it to the list of successful apps. + # 2. Inject the app key columns into the dataframe if enabled. + # 3. Call the success callback if defined. + self._successful_apps.add(app_id) + processed_df = app_res.data + if self._inject_app_ids_enabled: + # inject the app_id into the dataframe + processed_df = self._inject_app_into_df(csv_rep.handler, app_res.data, app_id) + if self._success_cb: + # apply the success_callback defined by the caller + try: + processed_df = self._success_cb(csv_rep.handler, app_id, processed_df) + except Exception as success_cb_ex: # pylint: disable=broad-except + # if the success callback fails, we log it but do not raise an error. + CSVReportCombiner.logger(csv_rep).error( + f'Failed to apply success_cb for app {app_id} on {csv_rep.tbl}: {success_cb_ex}') + # Q: Should we ignore or skip the empty dataframes? + combined_dfs.append(processed_df) + except Exception as single_entry_ex: # pylint: disable=broad-except + # if any exception occurs during the processing of the app, we log it and continue. + # add it to the failed_apps dictionary + CSVReportCombiner.logger(csv_rep).error( + f'Failed to process app entry {app_id} for {csv_rep.tbl}: {single_entry_ex}') + self._failed_apps[app_id] = single_entry_ex + return combined_dfs + + def _create_empty_df(self) -> pd.DataFrame: + """ + creates an empty DataFrame with the columns defined in the report builder. + :return: an empty dataframe. + """ + # get the dataframe based on user arguments. + empty_df = self.default_rep_builder.create_empty_df() + # if apps injection is enabled, we need to inject the app_id columns + if self._inject_app_ids_enabled and self._app_fields: + # make sure that we inject the app_id columns + return AppHandler.inject_into_df(empty_df, self._app_fields) + return empty_df def build(self) -> LoadDFResult: - """Build the combined CSV report.""" + """Build the combined report.""" # process teh arguments to ensure they are set correctly - self._evaluate_args() - + self._process_args() load_error = None final_df = None success = False + fallen_back = False + # loop on all the reports and combine their results try: - per_app_res = self.rep_builder.load() - # this is a dictionary and we should loop on it one by one to combine it combined_dfs = [] - for app_id, app_res in per_app_res.items(): - # we need to patch the app_id to the dataframe - if app_res.load_error or app_res.data.empty: - # process entry with failed results or skip them if no handlder is defined. - if self._failed_app_processor: - self._failed_app_processor(app_id, app_res) - else: - # default behavior is to skip the app - continue - else: - # process entry with successful results - app_df = self._success_app_processor(self.result_handler, - app_id, - app_res.data, - self._combine_args) - # Q: Should we ignore or skip the empty dataframes? - combined_dfs.append(app_df) + for csv_rep in self.rep_builders: + # load the report and combine the results. + # if an exception is thrown in a single iteration, then their must be something completely wrong and we + # need to fail. + combined_dfs.extend(self._build_single_report(csv_rep)) if combined_dfs: # only concatenate if we have any dataframes to combine final_df = pd.concat(combined_dfs, ignore_index=True) else: - # create an empty DataFrame if no data was collected. uses the table schema. + # create an empty DataFrame if no data was collected. final_df = self._create_empty_df() success = True except Exception as e: # pylint: disable=broad-except - # handle any exceptions that occur during the combination phase + # handle any exceptions that occur during combination phase or loading sub-reports load_error = e + if self._fall_back_to_empty_df: + # if we are falling back to an empty DataFrame, we create it here. + try: + final_df = self._create_empty_df() + success = True + fallen_back = True + except Exception as fall_back_ex: # pylint: disable=broad-except + self.logger(self.default_rep_builder).error( + f'could not fall back to empty df {self.tbl}: {fall_back_ex}') return LoadDFResult( - f_path='combination of multiple path for table: ' + self.rep_builder.tbl, + f_path='combination of multiple path for table: ' + self.tbl, data=final_df, success=success, - fallen_back=False, + fallen_back=fallen_back, load_error=load_error) @@ -381,3 +567,134 @@ def _load_single_app(self) -> TXTResult: return self.rep_reader.load_app_txt( self._tbl, app=self._apps[0]) + + +@dataclass +class LoadRawFilesResult(object): + """ + A dataclass to hold the result of loading raw files. + :param ds_name: The name of the dataset used to identify the loading process. + :param _reports: A Private field mapping [report_id: str, dataFrame: pd.DataFrame]. Each dataframe is + the combined dataframe of all per-app tools output. + :param _failed_loads: A dictionary mapping the appIDs to the failed reports. Each entry is of the structure + [app_id: str, tuple(report_id: str, error: str)] in order to give details on the root + cause of each failure in-case the caller needs to process those failures. + """ + ds_name: str + _reports: Dict[str, pd.DataFrame] = field(init=False, default_factory=dict) + _failed_loads: Dict[str, List[Tuple[str, str]]] = field(init=False, default_factory=dict) + + @property + def reports(self) -> Dict[str, pd.DataFrame]: + """ + Get the reports loaded from the CSV files. + :return: A dictionary mapping report IDs to their DataFrames. + """ + return self._reports + + @property + def failed_loads(self) -> Dict[str, List[Tuple[str, str]]]: + """ + Get the failed loads. + :return: A dictionary mapping app IDs to a list of tuples containing report ID and error message. + """ + return self._failed_loads + + def append_success(self, report_id: str, df_res: LoadDFResult) -> None: + """ + Append a successful report to the csv_files. + :param report_id: The unique identifier for the report. + :param df_res: The result of loading a DataFrame. + """ + self._reports[report_id] = df_res.data + + def append_failure(self, app_id: str, report_id: str, load_excep: Optional[Exception]) -> None: + """ + Append a report to the csv_files or failed_loads based on the success of the load. + :param app_id: The unique identifier for the application. + :param report_id: The unique identifier for the report. + :param load_excep: The exception raised during the loading of the report. + """ + self._failed_loads.setdefault(app_id, []).append((report_id, str(load_excep))) + + +@dataclass +class APIUtils(object): + """ + A utility class for API v1 components. + This class provides static methods to process results from CSVReportCombiner and handle exceptions. + """ + @staticmethod + def normalize_app_id_col(col_names: List[str]) -> Dict[str, str]: + """ + Normalize the appId column name to 'appId' if it exists in the column names. + :param col_names: List of column names. + :return: A dictionary mapping the original column name to 'appId' if it exists. + """ + for col_name in col_names: + if AppHandler.normalize_attribute(col_name) == 'app_id': + return {col_name: 'appId'} + return {} + + @staticmethod + def process_res( + raw_res: LoadRawFilesResult, + combiner: CSVReportCombiner, + convert_to_camel: bool = False, + raise_on_failure: bool = True, + raise_on_empty: bool = True + ) -> Optional[pd.DataFrame]: + """ + A utility function that wraps the creation of a combined per-app-report. + It processes the result of a CSVReportCombiner and handles exceptions. + :param raw_res: The LoadRawFilesResult instance to append the results to. + :param combiner: The CSVReportCombiner to process. + :param convert_to_camel: If True, convert the column names to camelCase. + This is useful to normalize the column-names to a common format. + :param raise_on_failure: If True, raise an exception if the combiner fails to + build the combined dataframe. Note that this is not intended to handle individual app-level + failures. For the latter, visit the arguments provided by the CSVReportCombiner. + :param raise_on_empty: If True, raise an exception if the resulting DataFrame is + empty or None. This is useful to ensure that mandatory reports are always valid DataFrames + while allow other optional reports to be empty without throwing exceptions. + :return: The resulting DataFrame. The DataFrame can be None if there was an error while loading the report + and the raise_on_empty is False. This cane be avoided if the combiner is configured to + fall_back to an empty DataFrame in case of failure. + :raises RuntimeError: If the combiner fails to build the final result and raise_on_failure is True. + :raises ValueError: If the resulting DataFrame is empty and raise_on_empty is True. + """ + try: + if convert_to_camel: + # Update the rep_builders by setting the col_map_cb to the staticmethod to_camel. + for rep in combiner.rep_builders: + rep.map_cols_cb(DataUtils.cols_to_camel_case) + # final_res is dictionary of [app_id: str, LoadDFResult] where each LoadDFResult + # contains the data and the success flag. + final_res = combiner.build() + # If the combiner failed to load the report, we should raise an exception + if not final_res.success and raise_on_failure: + if final_res.load_error: + raise RuntimeError( + f'Loading report {combiner.tbl} failed with error: {final_res.get_fail_cause()}' + ) from final_res.get_fail_cause() # use the get_fail_cause to get the original exception. + raise RuntimeError(f'Loading report {combiner.tbl} failed with unexpected error.') + # if dataframe is None, or dataframe is empty and raise_on_empty is True, raise an exception + if raise_on_empty and (final_res.data is None or final_res.data.empty): + detail_reason = 'a None DataFrame' if final_res.data is None else 'an empty DataFrame' + raise ValueError( + f'Loading report {combiner.tbl} on dataset {raw_res.ds_name} returned {detail_reason}.') + # If we reach this point, then there is no raise on exceptions, just proceed with wrapping up the report. + # 1. Append failed apps: Loop on combiner_failed apps and append them to the raw_res. + if combiner.failed_apps: + for app_id, app_error in combiner.failed_apps.items(): + raw_res.append_failure(app_id, combiner.tbl, app_error) + # 2. Append the resulting dataframe to the reports if it is successful. Else set the Dataframe to None. + if final_res.success: + raw_res.append_success(combiner.tbl, final_res) + return final_res.data + # If the combiner failed to build the final result, we should None + return None + except Exception as e: # pylint: disable=broad-except + # If we reach here, it means that the combiner failed to build the final result. + # We should raise an exception to inform the caller. + raise RuntimeError(f'Failed to load report {combiner.tbl} on dataset {raw_res.ds_name}: {e}') from e diff --git a/user_tools/src/spark_rapids_tools/tools/qualx/featurizers/default.py b/user_tools/src/spark_rapids_tools/tools/qualx/featurizers/default.py index e605539d9..c55ca9fbd 100644 --- a/user_tools/src/spark_rapids_tools/tools/qualx/featurizers/default.py +++ b/user_tools/src/spark_rapids_tools/tools/qualx/featurizers/default.py @@ -174,7 +174,7 @@ def extract_raw_features( Parameters ---------- toc: pd.DataFrame - Table of contents of CSV files for the dataset. + Table of contents of CSV files for the dataset dF node_level_supp: pd.DataFrame Node-level support information used to filter out metrics associated with unsupported operators. qualtool_filter: str @@ -536,15 +536,17 @@ def load_csv_files( Parameters ---------- toc: pd.DataFrame - Table of contents of CSV files for the dataset. + Table of contents of CSV files for the dataset dF app_id: str - Application ID. + Application ID to target the specific profiler CSV files. node_level_supp: pd.DataFrame Node-level support information used to filter out metrics associated with unsupported operators. + dF qualtool_filter: str Type of filter to apply to the qualification tool output, either 'stage' or None. qualtool_output: pd.DataFrame - Qualification tool output. + Qualification tool output. This is the Dataframe loaded for the Qualification core summary. + dF remove_failed_sql: bool Remove sqlIDs with high failure rates, default: True. @@ -646,6 +648,8 @@ def scan_tbl( sql_to_stage = scan_tbl('sql_to_stage_information') if not sql_to_stage.empty: # try: + # exclude the rows that do not show nodeIds. + # creates dF sqls_with_execs = ( sql_to_stage.loc[sql_to_stage['SQL Nodes(IDs)'].notna()][['sqlID', 'jobID']] .groupby(['sqlID']) @@ -656,6 +660,7 @@ def scan_tbl( sqls_with_execs = pd.DataFrame() if not sql_app_metrics.empty and not sqls_with_execs.empty: + # TODO: Not sure why we are doing this sql_app_metrics = ( sql_app_metrics.merge(sqls_with_execs, on='sqlID') .drop(columns=['jobID']) @@ -665,6 +670,7 @@ def scan_tbl( # Job to stageIds/sqlID mapping: job_map_tbl = scan_tbl('job_information') if not job_map_tbl.empty: + # dF job_map_tbl = job_map_tbl.rename(columns={'startTime': 'jobStartTime_min'}) job_map_tbl['sqlID'] = job_map_tbl['sqlID'].fillna(-1).astype(int) job_map_tbl['jobID'] = 'job_' + job_map_tbl['jobID'].astype(str) diff --git a/user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py b/user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py index 8fb8fbd0f..10717f0dd 100644 --- a/user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py +++ b/user_tools/src/spark_rapids_tools/tools/qualx/preprocess.py @@ -288,6 +288,8 @@ def load_profiles( profile_files.extend(csv_files + json_files) # filter files by app_ids in app_meta + # toc_list is a list of dataframes of the following schema + # toc_list = [] app_meta_list = [] for app_id, meta in app_meta.items(): @@ -297,12 +299,19 @@ def load_profiles( # filter profiler files by app_id and attach ds_name, appId, table_name # convert glob pattern to regex pattern if app_id == 'default': + # Create a list of all the raw metrics files. app_id_files = profile_files else: app_id = app_id.replace('*', '.*') app_id_files = [f for f in profile_files if re.search(app_id, f)] if app_id_files: + # creates a dF + # where: + # * filepath is the full path to the CSV/JSON file, + # * ds_name is the dataset name, + # * appId is the appId extracted from the filepath, + # * table_name is the file extracted from the filepath (without extension). tmp = pd.DataFrame({'filepath': app_id_files}) fp_split = tmp['filepath'].str.split(r'/') tmp['ds_name'] = f'{ds_name}:{job_name}' if job_name else ds_name @@ -310,6 +319,8 @@ def load_profiles( tmp['table_name'] = fp_split.str[-1].str.split('.').str[0] # collect mapping of appId (after globbing) to meta + # this eventually creates the list of AppIds + # creates a dF tmp_app_meta = tmp[['appId']].drop_duplicates() for key, value in meta.items(): tmp_app_meta[key] = value diff --git a/user_tools/src/spark_rapids_tools/tools/qualx/qualx_main.py b/user_tools/src/spark_rapids_tools/tools/qualx/qualx_main.py index 6f192892f..09f2218cd 100644 --- a/user_tools/src/spark_rapids_tools/tools/qualx/qualx_main.py +++ b/user_tools/src/spark_rapids_tools/tools/qualx/qualx_main.py @@ -29,7 +29,8 @@ from spark_rapids_tools import CspPath from spark_rapids_tools.api_v1 import QualCoreResultHandler -from spark_rapids_tools.api_v1.builder import CSVReport, CSVCombiner, APIResultHandler +from spark_rapids_tools.api_v1.builder import CSVReport, APIResultHandler, APIUtils, LoadRawFilesResult, \ + CSVReportCombiner from spark_rapids_tools.tools.qualx.config import ( get_cache_dir, @@ -213,15 +214,16 @@ def _get_qual_data(qual_handler: QualCoreResultHandler) -> Tuple[Optional[pd.Dat # 1- uses the default combination method which is dedicated to use the appHanlder to combine the results. # 2- ignore the failed execs. i.e., apps that has no execs. Otherwise, it is possible to add a call-back to handle # the apps that had no execs. - q_execs_res = ( - CSVCombiner(rep_builder=CSVReport(qual_handler).table('execCSVReport')) # use ExecsCSV report - .on_apps() # combine DFs on apps - .combine_args({'col_names': ['App ID']}) # inject cols when combining on apps - .build() # combine the results + execs_process_res = LoadRawFilesResult(ds_name='QualTool execs') + APIUtils.process_res( + raw_res=execs_process_res, + combiner=CSVReportCombiner( + rep_builders=[ + CSVReport(qual_handler) + .table('execCSVReport')] + ).on_app_fields({'app_id': 'App ID'}) # use "App ID" to be consistent with the remaining qualx code. ) - if not q_execs_res.success: - raise RuntimeError(f'Failed to load execution CSV report: {q_execs_res.load_error}') from q_execs_res.load_error - node_level_supp = load_qtool_execs(q_execs_res.data) + node_level_supp = load_qtool_execs(execs_process_res.reports.get('execCSVReport')) return node_level_supp, qualtool_summary @@ -743,15 +745,19 @@ def predict( if not qual_handlers: raise ValueError('qual_handlers list is empty - no qualification data available for prediction') + if all(q_handler.is_empty() for q_handler in qual_handlers): + logger.warning('All qualification handlers are empty - no apps to predict') + return pd.DataFrame() node_level_supp, qual_tool_output, qual_metrics = _get_combined_qual_data(qual_handlers) # create a DataFrame with default predictions for all app IDs. # this will be used for apps without predictions. default_preds_df = qual_tool_output.apply(create_row_with_default_speedup, axis=1) - if len(qual_metrics) == 0: - logger.warning('Qualification tool metrics are missing. Speedup predictions will be skipped.') - return pd.DataFrame() + # it is too early to decide whether to there are raw metrics or not + # if any(qual_metrics) == 0: + # logger.warning('Qualification tool metrics are missing. Speedup predictions will be skipped.') + # return pd.DataFrame() # construct mapping of appIds to original appNames app_id_name_map = default_preds_df.set_index('appId')['appName'].to_dict() diff --git a/user_tools/src/spark_rapids_tools/tools/qualx/revamp/__init__.py b/user_tools/src/spark_rapids_tools/tools/qualx/revamp/__init__.py new file mode 100644 index 000000000..b1d500c1e --- /dev/null +++ b/user_tools/src/spark_rapids_tools/tools/qualx/revamp/__init__.py @@ -0,0 +1,15 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""module that defines the rewriting of the qualX using the new Tools API.""" diff --git a/user_tools/src/spark_rapids_tools/tools/qualx/revamp/featurizer.py b/user_tools/src/spark_rapids_tools/tools/qualx/revamp/featurizer.py new file mode 100644 index 000000000..d926b0c97 --- /dev/null +++ b/user_tools/src/spark_rapids_tools/tools/qualx/revamp/featurizer.py @@ -0,0 +1,655 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Utility functions for featurizer for QualX""" + +from typing import List, Dict, Optional + +import pandas as pd + +from spark_rapids_tools.api_v1 import LoadRawFilesResult, ToolResultHandlerT, CSVReportCombiner, CSVReport, \ + APIUtils +from spark_rapids_tools.tools.qualx.preprocess import load_qtool_execs + +from spark_rapids_tools.utils import Utilities + + +def load_csv_files( + ds_name: str, + res_hs: List[ToolResultHandlerT] +) -> LoadRawFilesResult: + """ + Load CSV files from the result handlers into memory. This method should minimize applying logic + on the dataframes as mush as possible in order to give a clear separation between loading + raw-data Vs. processing them to create features. + The implementation uses CSVReportCombiner to combine the reports across all the apps. + By default, the combiner injects the appId column if it does not exist in the report. + :param ds_name: The name of the dataset. + :param res_hs: List of tools result handlers. The assumptions that each handler is valid and has + non-zero apps. + :return: A LoadRawFilesResult result object. + :raises: RuntimeError: If unexpected error is triggered while loading a mandatory report. + """ + raw_res = LoadRawFilesResult(ds_name=ds_name) + + # Get the app_info report. + # Raise when empty/failed to load. + # The API will not inject appId into the result because the report has the appId column already. + APIUtils.process_res( + raw_res=raw_res, + combiner=CSVReportCombiner( + rep_builders=[CSVReport(r_h).table('coreRawApplicationInformationCSV') for r_h in res_hs] + ) + ) + + # Get the jars table. + # Do not fail if it is empty/failed. This is empty for CPU applications. + APIUtils.process_res( + raw_res=raw_res, + combiner=CSVReportCombiner( + rep_builders=[CSVReport(r_h).table('coreRawRapidsJarsCSV') for r_h in res_hs] + ), + raise_on_empty=False, + raise_on_failure=False, + ) + + # Get the spark properties table (this should not be empty) + # Raise when empty/failed to load. + # TODO: It does not seem that this table is actually used anywhere + APIUtils.process_res( + raw_res=raw_res, + combiner=CSVReportCombiner( + rep_builders=[CSVReport(r_h).table('coreRawSparkPropertiesCSV') for r_h in res_hs] + ) + ) + + # Get the executor time-percent table (this should not be empty) + # Raise if empty or failed to load. + # This table already has "App ID" column for some reason. Rename the column to appId and disable + # app injection. + APIUtils.process_res( + raw_res=raw_res, + combiner=CSVReportCombiner( + rep_builders=[ + CSVReport(r_h) + .map_cols_cb(APIUtils.normalize_app_id_col) # normalize the "App ID" column to 'appId' + .table('coreRawSqlDurationAndExecutorCpuTimePercentCSV') for r_h in res_hs + ] + ).disable_apps_injection() # disable app-injection + ) + + # Get the executor information table + APIUtils.process_res( + raw_res=raw_res, + combiner=CSVReportCombiner( + rep_builders=[CSVReport(r_h).table('coreRawExecutorInformationCSV') for r_h in res_hs] + ) + ) + + # Get the sql level aggregated metrics (this might be empty if no SQLs/metrics are defined) + # Raise if empty or failed to load + # This table already has "appID". Rename the column to appId and avoid injecting the appId column. + APIUtils.process_res( + raw_res=raw_res, + combiner=CSVReportCombiner( + rep_builders=[ + CSVReport(r_h) + .map_cols_cb(APIUtils.normalize_app_id_col) + .table('coreRawSqlLevelAggregatedTaskMetricsCSV') for r_h in res_hs + ] + ).disable_apps_injection() + ) + + # Get sql-to-stage info (this can be empty if stages are not defined or sqls are not defined) + APIUtils.process_res( + raw_res=raw_res, + combiner=CSVReportCombiner( + rep_builders=[CSVReport(r_h).table('coreRawSqlToStageInformationCSV') for r_h in res_hs] + ), + raise_on_empty=False + ) + + # Get job info + APIUtils.process_res( + raw_res=raw_res, + combiner=CSVReportCombiner( + rep_builders=[CSVReport(r_h).table('coreRawJobInformationCSV') for r_h in res_hs] + ) + ) + + # Get the sql-plan-metrics. This cannot be empty. + APIUtils.process_res( + raw_res=raw_res, + combiner=CSVReportCombiner( + rep_builders=[CSVReport(r_h).table('coreRawSqlPlanMetricsForApplicationCSV') for r_h in res_hs] + ) + ) + + # Get the job-agg-metrics. This cannot be empty. + APIUtils.process_res( + raw_res=raw_res, + combiner=CSVReportCombiner( + rep_builders=[CSVReport(r_h).table('coreRawJobLevelAggregatedTaskMetricsCSV') for r_h in res_hs] + ) + ) + + # Get the stage-agg-metrics. This cannot be empty. + APIUtils.process_res( + raw_res=raw_res, + combiner=CSVReportCombiner( + rep_builders=[CSVReport(r_h).table('coreRawStageLevelAggregatedTaskMetricsCSV') for r_h in res_hs] + ) + ) + + # Get whole stage operator info. It can be empty; especially for the GPU jobs. + APIUtils.process_res( + raw_res=raw_res, + combiner=CSVReportCombiner( + rep_builders=[CSVReport(r_h).table('coreRawWholeStageCSV') for r_h in res_hs] + ), + raise_on_empty=False, + raise_on_failure=False + ) + + # Get failed-tasks info. It can be empty. + APIUtils.process_res( + raw_res=raw_res, + combiner=CSVReportCombiner( + rep_builders=[CSVReport(r_h).table('coreRawFailedTasksCSV') for r_h in res_hs] + ), + raise_on_empty=False, + raise_on_failure=False + ) + + # Get failed-stages info. It can be empty. + APIUtils.process_res( + raw_res=raw_res, + combiner=CSVReportCombiner( + rep_builders=[CSVReport(r_h).table('coreRawFailedStagesCSV') for r_h in res_hs] + ), + raise_on_empty=False, + raise_on_failure=False + ) + + # Get job-failed info. It can be empty. + APIUtils.process_res( + raw_res=raw_res, + combiner=CSVReportCombiner( + rep_builders=[CSVReport(r_h).table('coreRawFailedJobsCSV') for r_h in res_hs] + ), + raise_on_empty=False, + raise_on_failure=False + ) + + # Get data-source info. It can be empty. + # Note that this report is error-prune due to parsing the read-operators. + # P.S: It is wise to not fail if this report fail. this table may have lots non-deterministic + # behavior due to handling new read operators. + APIUtils.process_res( + raw_res=raw_res, + combiner=CSVReportCombiner( + rep_builders=[CSVReport(r_h).table('coreRawDataSourceInformationCSV') for r_h in res_hs] + ), + raise_on_empty=False, + raise_on_failure=False + ) + + ######################################### + # Load Qualification reports + ######################################### + + # Gets the execs_report. We want to safely inform the reporter not to fail because + # it should not if the result_handlers are not Qualification handlers. + # note that there is a side effect that we do not filter the rows immediately here. + # Instead, we combine all rows across all the apps before any filters which can be huge. + + APIUtils.process_res( + raw_res=raw_res, + combiner=CSVReportCombiner( + rep_builders=[ + CSVReport(r_h) + .table('execCSVReport') + for r_h in res_hs + ] + ).on_app_fields({'app_id': 'App ID'}), # use "App ID" to be consistent with the remaining qualx code. + raise_on_empty=False, + raise_on_failure=False + ) + # TODO: there is no need to load app_summary to get the app_durations. + # App duration should be available in the app_info_df. + + return raw_res + + +def process_raw_features( + raw_tbls: LoadRawFilesResult, + *, + qualtool_filter: Optional[str], + remove_failed_sql: bool = True) -> Dict[str, pd.DataFrame]: + """ + Process the raw features from the loaded CSV files. + :param raw_tbls: The result of loading raw files. + :param qualtool_filter: The type of filter to apply to the qualification tool output, either 'stage' or None. + :param remove_failed_sql: If True, remove SQLs with high failure rates. + :return: A DataFrame containing the processed features. + """ + def valid_df(arg: Optional[pd.DataFrame], allow_empty: bool = False) -> bool: + """ + Check if the DataFrame is not None and not empty. + :param arg: The DataFrame to check. + :param allow_empty: If True, allow the DataFrame to be empty. + :return: True if the DataFrame is valid, False otherwise. + """ + if arg is None: + return False + if arg.empty: + # If the DataFrame is empty, we check if allow_empty is True. + return allow_empty + return True + + # Step-1: process the failed_apps to decide if we need to drop some of them based on the failures + # Step-2: process the successful apps to extract the features + # Step-a: combine some of the columns. + # Step-b: return the processed DataFrame with the features. + + app_info_df = raw_tbls.reports.get('coreRawApplicationInformationCSV') + + if valid_df(app_info_df): + # Get jar versions: This is only valid for GPU applications. + raw_jars_df = raw_tbls.reports.get('coreRawRapidsJarsCSV') + # Merge jar versions into app_info_df based on appId + if valid_df(raw_jars_df): + # Extract version info per appId + def extract_versions(jars): + cudf_version = '-' + rapids4spark_version = '-' + bm_runner_version = '-' + for jar in jars.split(','): + jar = jar.strip().split('/')[-1].replace('.jar', '') + if jar.startswith('cudf'): + cudf_version = jar + elif jar.startswith('rapids-4-spark_'): + rapids4spark_version = jar + elif jar.startswith('rapids-4-spark-benchmarks_'): + bm_runner_version = jar + return pd.Series([cudf_version, rapids4spark_version, bm_runner_version]) + + jars_versions = raw_jars_df[['appId', 'Rapids4Spark jars']].copy() + jars_versions[['cudfVersion', 'rapids4sparkVersion', 'bmRunnerVersion']] = ( + jars_versions['Rapids4Spark jars'].apply(extract_versions) + ) + jars_versions = jars_versions[['appId', 'cudfVersion', 'rapids4sparkVersion', 'bmRunnerVersion']] + + # Merge with app_info_df + app_info_df = app_info_df.merge( + jars_versions, + on='appId', + how='left' + ) + # Fill missing version info with '-' + app_info_df[['cudfVersion', 'rapids4sparkVersion', 'bmRunnerVersion']] = ( + app_info_df[['cudfVersion', 'rapids4sparkVersion', 'bmRunnerVersion']].fillna('-') + ) + else: + # If no jars info, set all to '-' + app_info_df['cudfVersion'] = '-' + app_info_df['rapids4sparkVersion'] = '-' + app_info_df['bmRunnerVersion'] = '-' + + # Allow user-provided 'ds_name' as 'appName' + # TODO: Do we still need the line below? + app_info_df['appName'] = raw_tbls.ds_name + # Enforce the value of SparkVersion + app_info_df.fillna({'sparkVersion': 'Unknown'}, inplace=True) + + ################################################ + # coreRawSqlDurationAndExecutorCpuTimePercentCSV + ################################################ + # TODO: legacy code overwrite the SQLDuration with the AppDuration. It is not clear why we do so? + raw_exec_dur_df = raw_tbls.reports.get('coreRawSqlDurationAndExecutorCpuTimePercentCSV') + if valid_df(raw_exec_dur_df): + raw_exec_dur_df = raw_exec_dur_df.rename( + { + 'App Duration': 'appDuration', + 'Contains Dataset or RDD Op': 'containsDatasetOrRDDOp', + }, + axis=1, + ) + # create a column potentialProblems to indicate whether the "Potential Problems" column is equal to UDF or not. + raw_exec_dur_df['potentialProblems'] = ( + raw_exec_dur_df['Potential Problems'] == 'UDF' + ).fillna(False).astype(bool) + raw_exec_dur_df.drop(columns=['Potential Problems']) + + ############################################### + # coreRawSqlToStageInformationCSV + # coreRawSqlLevelAggregatedTaskMetricsCSV + ############################################### + # filter out sql ids that have no execs associated with them + # this should remove root sql ids in 3.4.1+ + raw_sql_to_stage_df = raw_tbls.reports.get('coreRawSqlToStageInformationCSV') + raw_sql_agg_metrics_df = raw_tbls.reports.get('coreRawSqlLevelAggregatedTaskMetricsCSV') + if valid_df(raw_sql_to_stage_df): + # Filters the DataFrame raw_sql_to_stage_df to only include rows where the column SQL Nodes(IDs) + # is not null (i.e., contains data). It then selects only the [appId, sqlID, jobID] columns from those + # filtered rows. The result is a new DataFrame containing just the appId, sqlID and jobID + # for entries that have associated SQL node IDs. + sqls_with_execs = ( + raw_sql_to_stage_df.loc[raw_sql_to_stage_df['SQL Nodes(IDs)'].notna()][['appId', 'sqlID', 'jobID']] + .groupby(['appId', 'sqlID']) # groups the filtered rows by appId and sqlID. + .first() # keeps the first occurence (i.e., 1st jobID for each unique pair). + .reset_index() # Resets the index to turn the groupby result back into a regular DataFrame. + ) + # filter the sqls with execs + if valid_df(raw_sql_agg_metrics_df) and valid_df(sqls_with_execs): + raw_sql_agg_metrics_df = ( + raw_sql_agg_metrics_df + .merge(sqls_with_execs, left_on=['appId', 'sqlID'], right_on=['appId', 'sqlID']) + .drop(columns=['jobID']) + .reset_index(drop=True) + ) + + ################################## + # execCSVReport + ################################## + node_level_supp = None + raw_execs_df = raw_tbls.reports.get('execCSVReport') + if valid_df(raw_execs_df): + node_level_supp = load_qtool_execs(raw_execs_df) + + ############################################### + # coreRawSqlPlanMetricsForApplicationCSV + ############################################### + raw_sql_plan_metrics_df = raw_tbls.reports.get('coreRawSqlPlanMetricsForApplicationCSV') + stages_supp = pd.DataFrame(columns=['appId', 'sqlID', 'stageIds']).astype({ + 'appId': Utilities.scala_to_pandas_type('String'), + 'sqlID': Utilities.scala_to_pandas_type('Long'), + 'stageIds': Utilities.scala_to_pandas_type('Int') + }) + if valid_df(raw_sql_plan_metrics_df): + if node_level_supp is not None: + if qualtool_filter == 'stage': + # Filter out rows that do not have matching keys in both tables. + raw_sql_plan_metrics_df = raw_sql_plan_metrics_df.merge( + node_level_supp, + left_on=['appId', 'sqlID', 'nodeID'], + right_on=['App ID', 'SQL ID', 'SQL Node Id'], + how='inner', + ) + raw_sql_plan_metrics_df = raw_sql_plan_metrics_df.drop( + columns=['App ID', 'SQL ID', 'SQL Node Id'] + ) + raw_sql_plan_metrics_df['stageIds'] = raw_sql_plan_metrics_df['stageIds'].apply( + lambda x: str(x).split(',') + ) + raw_sql_plan_metrics_df = raw_sql_plan_metrics_df.explode('stageIds') + # compute supported stages for use below. TBD. + # rename column from 'Exec Is Supported' to 'Stage Is Supported' and change elsewhere + stages_supp = ( + raw_sql_plan_metrics_df.groupby(['appId', 'sqlID', 'stageIds'])[ + 'Exec Is Supported' + ] + .agg('all') + .reset_index() + ) + # the stageIds column resulting from the explode is of type object. So, we need to + # convert it back to Int64. Then we drop the rows with N/A Values. + stages_supp['stageIds'] = pd.to_numeric(stages_supp['stageIds'], errors='coerce').astype('Int64') + # Filter out N/A: make sure to use `notna()` to handle all sort of N/A values. + stages_supp = stages_supp.loc[ + stages_supp['stageIds'].notna() + ].reset_index(drop=True) + # filter sql ops to have only supported ones for processing in calling fn + raw_sql_plan_metrics_df = raw_sql_plan_metrics_df.loc[ + raw_sql_plan_metrics_df['Exec Is Supported'] + ].drop(columns=['Exec Is Supported']) + elif qualtool_filter == 'sqlId': + sql_level_supp = ( + node_level_supp.groupby(['App ID', 'SQL ID'])['Exec Is Supported'] + .agg('all') + .reset_index() + ) + sql_level_supp = sql_level_supp.loc[sql_level_supp['Exec Is Supported']] + raw_sql_agg_metrics_df = raw_tbls.reports.get('coreRawSqlLevelAggregatedTaskMetricsCSV') + if valid_df(raw_sql_agg_metrics_df): + raw_sql_agg_metrics_df = raw_sql_agg_metrics_df.merge( + sql_level_supp, + left_on=['appId', 'sqlID'], + right_on=['App ID', 'SQL ID'], + how='inner' + ) + raw_sql_agg_metrics_df = raw_sql_agg_metrics_df.drop( + columns=['Exec Is Supported', 'App ID', 'SQL ID'] + ) + else: + # Don't filter out unsupported ops + pass + + # sql ids to drop due to failures meeting below criteria + # mark for removal from modeling sql ids that have failed stage time > 10% total stage time + allowed_failed_duration_fraction = 0.10 + sqls_to_drop = pd.DataFrame(columns=['appId', 'sqlID']).astype({ + 'appId': Utilities.scala_to_pandas_type('String'), + 'sqlID': Utilities.scala_to_pandas_type('Long') + }) + + ############################################### + # coreRawStageLevelAggregatedTaskMetricsCSV + ############################################### + raw_stage_agg_metrics_df = raw_tbls.reports.get('coreRawStageLevelAggregatedTaskMetricsCSV') + if valid_df(raw_stage_agg_metrics_df): + # get the failed stage_df + raw_failed_stage_df = raw_tbls.reports.get('coreRawFailedStagesCSV') + if valid_df(raw_failed_stage_df) and valid_df(raw_sql_to_stage_df): + stage_agg_tbl = raw_stage_agg_metrics_df[['appId', 'stageId', 'Duration']].merge( + raw_sql_to_stage_df, left_on=['appId', 'stageId'], right_on=['appId', 'stageId'] + ) + total_stage_time = ( + stage_agg_tbl[['appId', 'sqlID', 'Duration']] + .groupby(['appId', 'sqlID']) + .agg('sum') + .reset_index() + ) + failed_stage_time = ( + stage_agg_tbl[['appId', 'sqlID', 'stageId', 'Duration']] + .merge(raw_failed_stage_df, left_on=['appId', 'stageId'], right_on=['appId', 'stageId'], how='inner')[ + ['appId', 'sqlID', 'Duration'] + ] + .groupby(['appId', 'sqlID']) + .agg('sum') + .reset_index() + ) + stage_times = total_stage_time.merge( + failed_stage_time, on=['appId', 'sqlID'], how='inner' + ) + # append the failed sqls to the sqls_to_drop DataFrame + rows_to_drop = stage_times.loc[ + stage_times.Duration_y + > stage_times.Duration_x * allowed_failed_duration_fraction + ][['appId', 'sqlID']] + sqls_to_drop = pd.concat([sqls_to_drop, rows_to_drop], ignore_index=True) + # we can debug here to see the failed sqls + + if valid_df(stages_supp, allow_empty=True) and (qualtool_filter == 'stage'): + raw_stage_agg_metrics_df = raw_stage_agg_metrics_df.merge( + stages_supp, + left_on=['appId', 'stageId'], + right_on=['appId', 'stageIds'], + how='inner' + ).drop(columns=['stageIds']) # drop the stageIds column from the stages_supp dataframe + + # add a boolean column to indicate if the stage is associated with a SQL ID or not. + raw_stage_agg_metrics_df['hasSqlID'] = raw_stage_agg_metrics_df['sqlID'].notna() + + ############################################### + # coreRawJobInformationCSV + ############################################### + raw_job_info_df = raw_tbls.reports.get('coreRawJobInformationCSV') + if valid_df(raw_job_info_df): + # dF + raw_job_info_df = raw_job_info_df.rename(columns={'startTime': 'jobStartTime_min'}) + raw_job_info_df['sqlID'] = raw_job_info_df['sqlID'].fillna(-1).astype(Utilities.scala_to_pandas_type('Int')) + # TODO: Maybe we should not need this line, but it is used in the legacy code. + # raw_job_info_df['jobID'] = 'job_' + raw_job_info_df['jobID'].astype(Utilities.scala_to_pandas_type('String')) + + ############################################### + # coreRawJobLevelAggregatedTaskMetricsCSV + ############################################### + # TODO: raw_job_agg_metrics_df does not seem to be used after that. Maybe we do not need it? + raw_job_agg_metrics_df = raw_tbls.reports.get('coreRawJobLevelAggregatedTaskMetricsCSV') + if valid_df(raw_job_agg_metrics_df, allow_empty=True): + # # rename the column jobId to jobID + # raw_job_agg_metrics_df = raw_job_info_df.rename(columns={'jobId': 'jobID'}) + # get the information from jobInfo tbl + if valid_df(raw_job_info_df, allow_empty=True): + raw_job_agg_metrics_df = raw_job_agg_metrics_df.merge( + raw_job_info_df[['appId', 'jobID', 'sqlID', 'jobStartTime_min']], + left_on=['appId', 'jobId'], + right_on=['appId', 'jobID'], + how='left' + ) + # Add a boolean column to indicate if the job is associated with a SQL ID or not. + raw_job_agg_metrics_df['hasSqlID'] = raw_job_agg_metrics_df['sqlID'] != -1 + # drop redundant columns jobId + raw_job_agg_metrics_df = raw_job_agg_metrics_df.drop(columns=['jobId']) + + raw_spark_props_df = raw_tbls.reports.get('coreRawSparkPropertiesCSV') + if valid_df(raw_spark_props_df): + raw_spark_props_df = raw_spark_props_df.set_index('propertyName') + + ############################################### + # coreRawFailedTasksCSV + ############################################### + raw_failed_tasks_df = raw_tbls.reports.get('coreRawFailedTasksCSV') + if valid_df(raw_failed_tasks_df): + # aggregate failed tasks per appName, appId, sqlID + raw_failed_tasks_df = raw_failed_tasks_df.groupby(['appId', 'stageId'])['attempt'].count().reset_index() + raw_failed_tasks_df = raw_failed_tasks_df.merge( + raw_sql_to_stage_df[['appId', 'stageId', 'sqlID']], + on=['appId', 'stageId'] + ) + raw_failed_tasks_df = raw_failed_tasks_df.groupby( + ['appId', 'sqlID'] + )['attempt'].sum().reset_index() + raw_failed_tasks_df = raw_failed_tasks_df.rename(columns={'attempt': 'failed_tasks'}) + else: + raw_failed_tasks_df = pd.DataFrame(columns=['appId', 'sqlID', 'failed_tasks']).astype({ + 'appId': Utilities.scala_to_pandas_type('String'), + 'sqlID': Utilities.scala_to_pandas_type('Long'), + 'failed_tasks': Utilities.scala_to_pandas_type('Int') + }) + + ############################################### + # Merging tables together + ############################################### + + # merged app_info_tbl + raw_exec_info_df = raw_tbls.reports.get('coreRawExecutorInformationCSV') + if all(valid_df(t_df) for t_df in [ + app_info_df, raw_exec_dur_df, raw_sql_agg_metrics_df, raw_exec_info_df + ]): + # merge all the tables together + app_info_mg = app_info_df.merge( + raw_exec_info_df, + left_on='appId', + right_on='appId' + ) + app_info_mg = app_info_mg.merge( + raw_sql_agg_metrics_df, left_on='appId', right_on='appId' + ) + app_info_mg = app_info_mg.merge( + raw_exec_dur_df[ + [ + 'appId', + 'sqlID', + 'appDuration', + ] + ], + left_on=['appId', 'sqlID'], + right_on=['appId', 'sqlID'], + ) + + # filter out sqlIDs with aborted jobs (these are jobs failed due to sufficiently many (configurable) failed + # attempts of a stage due to error conditions). These are failed sqlIDs that we shouldn't model, + # but are still included in profiler output. + raw_failed_jobs_df = raw_tbls.reports.get('coreRawFailedJobsCSV') + if valid_df(raw_failed_jobs_df) and valid_df(raw_job_info_df): + aborted_jobs = raw_failed_jobs_df.loc[ + raw_failed_jobs_df.failureReason.str.contains('aborted') + ][['appId', 'jobID']] + # TODO: Is this always empty? jobInfo contains failed jobs? + aborted_jobs_sql_id = raw_job_info_df.merge( + aborted_jobs, how='inner', on=['appId', 'jobID'] + ) + aborted_sql_ids = aborted_jobs_sql_id[['appId', 'sqlID']].drop_duplicates() + sqls_to_drop = pd.concat([sqls_to_drop, aborted_sql_ids], ignore_index=True).drop_duplicates() + + else: + app_info_mg = pd.DataFrame() + + if remove_failed_sql and valid_df(app_info_mg) and valid_df(sqls_to_drop): + # removes rows from the app_info_mg DataFrame that have a matching appId and sqlID in + # the sqls_to_drop DataFrame. + # 1. performs a left merge of app_info_mg with sqls_to_drop on appId and sqlID, adding + # a column _merge that indicates whether each row was matched (both) or only present + # in app_info_mg (left_only). + # 2. It filters the merged DataFrame to keep only rows where _merge is left_only, i.e., + # those not present in sqls_to_drop. + # 3. It drops the _merge column, returning a DataFrame with the unwanted rows removed. + app_info_mg = app_info_mg.merge( + sqls_to_drop, + on=['appId', 'sqlID'], + how='left', + indicator=True + ) + app_info_mg = app_info_mg[app_info_mg['_merge'] == 'left_only'].drop(columns=['_merge']) + + return { + 'app_tbl': app_info_mg, + 'ops_tbl': raw_sql_plan_metrics_df, + 'spark_props_tbl': raw_spark_props_df, + 'job_map_tbl': raw_job_info_df, + 'job_stage_agg_tbl': raw_stage_agg_metrics_df, + 'wholestage_tbl': raw_tbls.reports.get('coreRawWholeStageCSV'), + 'ds_tbl': raw_tbls.reports.get('coreRawDataSourceInformationCSV'), + 'failed_tasks_tbl': raw_failed_tasks_df + } + + +def extract_raw_features( + ds_name: str, + res_hs: List[ToolResultHandlerT], + *, + qualtool_filter: Optional[str], + remove_failed_sql: bool = True) -> pd.DataFrame: + """ + Extract raw features from the result handlers for a given dataset. + :param ds_name: + :param res_hs: + :param qualtool_filter: Type of filter to apply to the qualification tool output, either 'stage' or None. + :param remove_failed_sql: + :return: + """ + raw_tbls = load_csv_files(ds_name, res_hs) + # if no reports were loaded, return an empty DataFrame + if not raw_tbls.reports: + return pd.DataFrame() + # process the raw features from the loaded CSV files + process_raw_features( + raw_tbls, + qualtool_filter=qualtool_filter, + remove_failed_sql=remove_failed_sql + ) + print(f'those are the successful loaded reports: {raw_tbls.reports.keys()}') + return pd.DataFrame() diff --git a/user_tools/src/spark_rapids_tools/tools/qualx/revamp/preprocess.py b/user_tools/src/spark_rapids_tools/tools/qualx/revamp/preprocess.py new file mode 100644 index 000000000..4b1f6a106 --- /dev/null +++ b/user_tools/src/spark_rapids_tools/tools/qualx/revamp/preprocess.py @@ -0,0 +1,81 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Utility functions for preprocessing for QualX""" + +from typing import Mapping, Optional + +import pandas as pd + + +from spark_rapids_tools.tools.qualx.config import get_config +from spark_rapids_tools.tools.qualx.preprocess import infer_app_meta, get_alignment +from spark_rapids_tools.tools.qualx.revamp.featurizer import extract_raw_features + + +def load_profiles( + datasets: Mapping[str, Mapping], + *, + qual_tool_filter: Optional[str] = None, + remove_failed_sql: bool = True) -> pd.DataFrame: + """ + Load and preprocess the profiles from the datasets. + :param datasets: A mapping of dataset names to their profiles. + :param qual_tool_filter: Optional filter for the qualification tool output, either 'stage' or None. + :param remove_failed_sql: If True, remove profiles with failed SQL. + :raises KeyError: If dataSet does not have 'app_meta' or 'eventlogs' defined. + :raises ValueError: If the default app_meta is not defined or if it has multiple entries. + :return: A DataFrame containing the loaded profiles. + """ + # define a dict to load all the plugins + plugins = {} + config = get_config() + alignment_df = get_alignment() + # the line below is modified to enforce using the revamp methods + # featurizers: List[Callable[[str, List[ToolResultHandlerT]], pd.DataFrame]] = [extract_raw_features] + for ds_name, ds_meta in datasets.items(): + # get platform from dataset metadata, or use onprem if not provided + platform = ds_meta.get('platform', 'onprem') + if 'load_profiles_hook' in ds_meta: + plugins[ds_name] = ds_meta['load_profiles_hook'] + if 'eventlogs' in ds_meta: + # dataset has an entry for eventlogs, this implies that we need to get the app_meta, + # or infer from directory structure of eventlogs. + app_meta = ds_meta.get('app_meta', infer_app_meta(ds_meta['eventlogs'])) + else: + # dataset does not have an entry for eventlogs, so we can use the app_meta directly. + # This will raise a keyError if app_meta is not defined. + app_meta = ds_meta['app_meta'] + # get the array of resultHandlers from the dataset metadata. + core_res_handlers = ds_meta['resultHandlers'] + + # get default app_meta + app_meta_default = app_meta['default'] + if len(app_meta) != 1: + raise ValueError(f'Default app_meta for {ds_name} cannot be used with additional entries.') + app_meta_default = {'runType': 'CPU', 'scaleFactor': 1} + + # TODO: Should we merge the result handlers or just keep it that way? + # invoke featurizers to extract raw features from profiler output + features = extract_raw_features( + ds_name=ds_name, + res_hs=core_res_handlers, + qualtool_filter=qual_tool_filter, + remove_failed_sql=remove_failed_sql + ) + if features is not None: + # add the features to the dataset metadata + return features + + return pd.DataFrame() diff --git a/user_tools/src/spark_rapids_tools/tools/qualx/revamp/x_main.py b/user_tools/src/spark_rapids_tools/tools/qualx/revamp/x_main.py new file mode 100644 index 000000000..a07547e54 --- /dev/null +++ b/user_tools/src/spark_rapids_tools/tools/qualx/revamp/x_main.py @@ -0,0 +1,74 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Main module for QualX related commands """ + +from pathlib import Path +from typing import Optional, List + +import pandas as pd + +from spark_rapids_tools.api_v1 import QualCoreResultHandler +from spark_rapids_tools.tools.qualx.config import get_config +from spark_rapids_tools.tools.qualx.revamp.preprocess import load_profiles +from spark_rapids_tools.tools.qualx.util import get_logger + +logger = get_logger(__name__) + + +def predict_x( + platform: str, + qual: str, + output_info: dict, + *, + model: Optional[str] = None, + qual_tool_filter: Optional[str] = None, + config: Optional[str] = None, + qual_handlers: List[QualCoreResultHandler] +) -> pd.DataFrame: + """ + sddsdssd + :param platform: + :param qual: + :param output_info: + :param model: + :param qual_tool_filter: + :param config: + :param qual_handlers: + :return: + """ + # load config from command line argument, or use default + cfg = get_config(config) + model_type = cfg.model_type + model_config = cfg.__dict__.get(model_type, {}) + qual_filter = qual_tool_filter if qual_tool_filter else model_config.get('qual_tool_filter', 'stage') + + if not qual_handlers: + raise ValueError('qual_handlers list is empty - no qualification data available for prediction') + if all(q_handler.is_empty() for q_handler in qual_handlers): + logger.warning('All qualification handlers are empty - no apps to predict') + return pd.DataFrame() + # if qualification metrics are provided, load metrics and apply filtering + datasets = {} # create a dummy dataset + dataset_name = Path(qual).name # use qual directory name as dataset name + datasets[dataset_name] = { + 'resultHandlers': qual_handlers, + 'app_meta': {'default': {'runType': 'CPU', 'scaleFactor': 1}}, + 'platform': platform, + } + logger.debug('Loading dataset: %s', dataset_name) + # load the raw files from the qualification handlers + load_profiles(datasets=datasets, qual_tool_filter=qual_filter, remove_failed_sql=False) + + return pd.DataFrame() diff --git a/user_tools/src/spark_rapids_tools/utils/data_utils.py b/user_tools/src/spark_rapids_tools/utils/data_utils.py index 831e480c8..3db850f50 100644 --- a/user_tools/src/spark_rapids_tools/utils/data_utils.py +++ b/user_tools/src/spark_rapids_tools/utils/data_utils.py @@ -24,6 +24,7 @@ from jproperties import Properties from spark_rapids_tools import CspPathT +from spark_rapids_tools.utils import Utilities ResDataT = TypeVar('ResDataT') @@ -44,9 +45,15 @@ class AbstractReportResult(Generic[ResDataT]): load_error: Optional[Exception] = None def get_fail_cause(self) -> Optional[Exception]: - if self.load_error: - return self.load_error.__cause__ - return None + """ + Get the root cause of the failure if any. If the exception was raised from another, + return the original exception; otherwise, return the direct exception. + :return: the root exception if any + """ + exc = self.load_error + while exc and exc.__cause__ is not None: + exc = exc.__cause__ + return exc @dataclass @@ -118,6 +125,15 @@ class DataUtils: Utility functions to use common data handling such as reading an opening CSV files. """ + @staticmethod + def cols_to_camel_case(col_names: List[str]) -> Dict[str, str]: + """ + Map the column names to camelCase. + :param col_names: The list of column names to map. + :return: A dictionary mapping the original column names to their camelCase equivalents. + """ + return {col: Utilities.str_to_camel(col) for col in col_names} + @staticmethod def convert_df_to_dict(df: pd.DataFrame) -> List[dict[str, str]]: """ diff --git a/user_tools/src/spark_rapids_tools/utils/util.py b/user_tools/src/spark_rapids_tools/utils/util.py index 11062dcf9..c0847215f 100644 --- a/user_tools/src/spark_rapids_tools/utils/util.py +++ b/user_tools/src/spark_rapids_tools/utils/util.py @@ -443,3 +443,18 @@ def scala_to_pandas_type(scala_type: str) -> str: # Add more mappings for other Scala types as needed } return scala_to_pandas_map.get(scala_type, 'object') # Default to object for unknown types. + + @staticmethod + def str_to_camel(s: str) -> str: + """ + Convert a string to camel case. + Adopted from + https://www.30secondsofcode.org/python/s/string-capitalize-camel-snake-kebab/#camel-case-string + > To convert a string to camel case, you can use re.sub() to replace any - or _ with a space, + > using the regexp r"(_|-)+". Then, use str.title() to capitalize every word and convert the + > rest to lowercase. Finally, use str.replace() to remove any spaces between words. + :param s: The input string. + :return: The camel case version of the input string. + """ + s = re.sub(r'([_\-])+', ' ', s).title().replace(' ', '') + return ''.join([s[0].lower(), s[1:]]) diff --git a/user_tools/tests/spark_rapids_tools_ut/api/__init__.py b/user_tools/tests/spark_rapids_tools_ut/api/__init__.py new file mode 100644 index 000000000..094d723ae --- /dev/null +++ b/user_tools/tests/spark_rapids_tools_ut/api/__init__.py @@ -0,0 +1,15 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Module to test the API Tools""" diff --git a/user_tools/tests/spark_rapids_tools_ut/api/test_app_handlers.py b/user_tools/tests/spark_rapids_tools_ut/api/test_app_handlers.py new file mode 100644 index 000000000..1ddccfcee --- /dev/null +++ b/user_tools/tests/spark_rapids_tools_ut/api/test_app_handlers.py @@ -0,0 +1,91 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Module to test functionality of the app_handlers""" + +import unittest +from io import StringIO + +import pandas as pd + +from spark_rapids_tools.api_v1 import AppHandler + + +class TestAppHandlers(unittest.TestCase): + """ + A unit test for the app-handlers + """ + + +def test_add_fields_to_dataframe_basic(): + csv_data = 'a,b\n1,2\n3,4' + original_df = pd.read_csv(StringIO(csv_data)) + handler = AppHandler(_app_id='app-00', + _attempt_id=1, + _app_name='app_name-00', + eventlog_path='/path/to/file') + # User wants to map 'appID' and 'name' to columns 'APP ID' and 'App Name' + mapping = {'app_id': 'App ID', 'app_name': 'App Name'} + result_df = handler.add_fields_to_dataframe(original_df, mapping) + + assert result_df.shape == (2, 4) + assert result_df.columns[0] == 'App ID' + assert result_df.columns[1] == 'App Name' + expected_data = 'App ID,App Name,a,b\napp-00,app_name-00,1,2\napp-00,app_name-00,3,4' + expected_df = pd.read_csv(StringIO(expected_data)).astype({'App ID': 'string', 'App Name': 'string'}) + # assert the dataTypes are correct (string and not object) + assert result_df.dtypes[0] == 'string' + assert result_df.dtypes[1] == 'string' + # verify the entire dataframes are equal + pd.testing.assert_frame_equal(expected_df, result_df) + + +def test_add_fields_to_dataframe_empty(): + """ + Test adding fields to an empty DataFrame. + """ + # Define the schema using a dictionary where keys are column names and values are data types + schema = { + 'col01': 'string', + 'col02': 'int64', + 'col03': 'string', + 'col04': 'float32' + } + + # Create an empty DataFrame with the defined schema + empty_df = pd.DataFrame({ + col: pd.Series(dtype=dtype) + for col, dtype in schema.items() + }) + + handler = AppHandler(_app_id='app-00', + _attempt_id=1, + _app_name='app_name-00', + eventlog_path='/path/to/file') + # User wants to map 'appID' and 'name' to columns 'APP ID' and 'App Name' + mapping = {'app_id': 'App ID', 'attempt_id': 'Attempt ID', 'app_name': 'App Name'} + result_df = handler.add_fields_to_dataframe(empty_df, mapping) + assert result_df.shape == (0, 7) # Should still be empty with 6 columns + expected_schema = { + 'App ID': 'string', + 'Attempt ID': 'Int64', + 'App Name': 'string', + 'col01': 'string', + 'col02': 'int64', + 'col03': 'string', + 'col04': 'float32' + } + + for col, dtype in expected_schema.items(): + assert result_df[col].dtype.name == dtype