From f24c445e9f1cdd008511f6bd656ac8550e35485e Mon Sep 17 00:00:00 2001 From: Peter Onyisi Date: Sun, 3 Aug 2025 21:47:46 -0500 Subject: [PATCH] Add models and routes for reporting dataset lookup errors --- servicex_app/servicex_app/models.py | 4 + .../resources/internal/fileset_error.py | 118 ++++++++++++++++++ servicex_app/servicex_app/routes.py | 7 ++ 3 files changed, 129 insertions(+) create mode 100644 servicex_app/servicex_app/resources/internal/fileset_error.py diff --git a/servicex_app/servicex_app/models.py b/servicex_app/servicex_app/models.py index 6e78c482..ad2d79bb 100644 --- a/servicex_app/servicex_app/models.py +++ b/servicex_app/servicex_app/models.py @@ -157,6 +157,7 @@ class TransformStatus(Enum): complete = ("Complete", True) fatal = ("Fatal", True) canceled = ("Canceled", True) + bad_dataset = ("Bad Dataset", True) def __init__(self, string_name, is_complete): self.string_name = string_name @@ -404,6 +405,9 @@ class DatasetStatus(str, Enum): created = "created" looking = "looking" complete = "complete" + does_not_exist = "does_not_exist" + bad_name = "bad_name" + internal_failure = "internal_failure" class Dataset(db.Model): diff --git a/servicex_app/servicex_app/resources/internal/fileset_error.py b/servicex_app/servicex_app/resources/internal/fileset_error.py new file mode 100644 index 00000000..ebe47b38 --- /dev/null +++ b/servicex_app/servicex_app/resources/internal/fileset_error.py @@ -0,0 +1,118 @@ +# Copyright (c) 2025, IRIS-HEP +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +from flask import request, current_app + +from servicex_app.models import ( + Dataset, + db, + TransformRequest, + TransformStatus, + DatasetStatus, +) +from servicex_app.resources.servicex_resource import ServiceXResource + +from datetime import datetime, timezone + + +class FilesetError(ServiceXResource): + @classmethod + def make_api(cls, lookup_result_processor, transformer_manager): + cls.lookup_result_processor = lookup_result_processor + cls.transformer_manager = transformer_manager + return cls + + def put(self, dataset_id): + summary = request.get_json() + dataset = Dataset.find_by_id(int(dataset_id)) + + if dataset is None: + current_app.logger.info( + "Dataset lookup error received for unknown dataset", + extra={ + "dataset_id": dataset_id, + "error-type": summary["error-type"], + "message": summary["message"], + }, + ) + return + + current_app.logger.info( + "Error in file lookup", + extra={ + "dataset_id": dataset_id, + "elapsed-time": summary["elapsed-time"], + "error-type": summary["error-type"], + "message": summary["message"], + }, + ) + + dataset.lookup_status = DatasetStatus(summary["error-type"]) + dataset.stale = True # Repeat lookup if we try again + db.session.commit() + + # shut down related transformations. Nothing good can come of letting them + # continue to run + namespace = current_app.config["TRANSFORMER_NAMESPACE"] + for running_request in TransformRequest.lookup_running_by_dataset_id( + int(dataset_id) + ): + running_request.status = TransformStatus.bad_dataset + running_request.finish_time = datetime.now(tz=timezone.utc) + self.transformer_manager.shutdown_transformer_job( + running_request.request_id, namespace + ) + current_app.logger.info( + "Shutting down transformer because of dataset lookup problem", + extra={ + "dataset_id": dataset_id, + "elapsed-time": summary["elapsed-time"], + "error-type": summary["error-type"], + "message": summary["message"], + "requestId": running_request.request_id, + }, + ) + + # Tell any other transform that was waiting for the lookup to complete + # not to expect to run + for pending_transform in TransformRequest.lookup_pending_on_dataset( + int(dataset_id) + ): + pending_transform.status = TransformStatus.bad_dataset + pending_transform.finish_time = datetime.now(tz=timezone.utc) + current_app.logger.info( + "Shutting down transformer because of dataset lookup problem", + extra={ + "dataset_id": dataset_id, + "elapsed-time": summary["elapsed-time"], + "error-type": summary["error-type"], + "message": summary["message"], + "requestId": pending_transform.request_id, + }, + ) + + db.session.commit() diff --git a/servicex_app/servicex_app/routes.py b/servicex_app/servicex_app/routes.py index 8fbeb1f1..148946db 100644 --- a/servicex_app/servicex_app/routes.py +++ b/servicex_app/servicex_app/routes.py @@ -49,6 +49,7 @@ def add_routes( from servicex_app.resources.internal.add_file_to_dataset import AddFileToDataset from servicex_app.resources.internal.fileset_complete import FilesetComplete + from servicex_app.resources.internal.fileset_error import FilesetError from servicex_app.resources.internal.transform_status import ( TransformationStatusInternal, ) @@ -183,6 +184,12 @@ def add_routes( "/servicex/internal/transformation//complete", ) + FilesetError.make_api(lookup_result_processor, transformer_manager) + api.add_resource( + FilesetError, + "/servicex/internal/transformation//error", + ) + TransformerFileComplete.make_api(transformer_manager) api.add_resource( TransformerFileComplete,