diff --git a/src/servicex_did_finder_lib/communication.py b/src/servicex_did_finder_lib/communication.py index ec93e9c..bd7d075 100644 --- a/src/servicex_did_finder_lib/communication.py +++ b/src/servicex_did_finder_lib/communication.py @@ -1,9 +1,37 @@ +# Copyright (c) 2021-2025, IRIS-HEP +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + import argparse from datetime import datetime import json import logging import time -from typing import Any, AsyncGenerator, Callable, Dict, List, Optional +from typing import Any, AsyncGenerator, Callable, Dict, List, Optional, Union import sys import pika @@ -15,7 +43,8 @@ from .servicex_adaptor import ServiceXAdapter # The type for the callback method to handle DID's, supplied by the user. -UserDIDHandler = Callable[[str, Dict[str, Any]], AsyncGenerator[Dict[str, Any], None]] +UserDIDHandler = Callable[[str, Dict[str, Any]], AsyncGenerator[Union[Dict[str, Any], + List[Dict[str, Any]]], None]] # Given name, build the RabbitMQ queue name by appending this. # This is backed into how ServiceX works - do not change unless it diff --git a/src/servicex_did_finder_lib/did_finder_app.py b/src/servicex_did_finder_lib/did_finder_app.py index 06c6306..7f47993 100644 --- a/src/servicex_did_finder_lib/did_finder_app.py +++ b/src/servicex_did_finder_lib/did_finder_app.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022, IRIS-HEP +# Copyright (c) 2022-2025, IRIS-HEP # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -36,6 +36,7 @@ from servicex_did_finder_lib.did_summary import DIDSummary from servicex_did_finder_lib.servicex_adaptor import ServiceXAdapter from servicex_did_finder_lib.util_uri import parse_did_uri +from servicex_did_finder_lib import exceptions # The type for the callback method to handle DID's, supplied by the user. # Arguments are: @@ -102,14 +103,7 @@ def do_lookup(self, did: str, dataset_id: int, endpoint: str, user_did_finder: U if did_info.file_count > 0: # otherwise wait until all files arrive then limit results acc.send_on(did_info.file_count) - except Exception: - # noinspection PyTypeChecker - self.logger.error( - f"Error processing DID {did}", - extra={"dataset_id": dataset_id}, - exc_info=1 - ) - finally: + elapsed_time = int((datetime.now() - start_time).total_seconds()) servicex.put_fileset_complete( { @@ -120,6 +114,23 @@ def do_lookup(self, did: str, dataset_id: int, endpoint: str, user_did_finder: U "elapsed-time": elapsed_time, } ) + except Exception as e: + # noinspection PyTypeChecker + self.logger.error( + f"Error processing DID {did}", + extra={"dataset_id": dataset_id}, + exc_info=1 + ) + elapsed_time = int((datetime.now() - start_time).total_seconds()) + error_dict: dict[str, Any] = {"elapsed-time": elapsed_time, + "message": str(e)} + if isinstance(e, exceptions.BaseDIDFinderException): + error_dict["error-type"] = e.error_type + else: + error_dict["error-type"] = "internal_failure" + servicex.put_fileset_error( + error_dict + ) class DIDFinderApp(Celery): diff --git a/src/servicex_did_finder_lib/exceptions.py b/src/servicex_did_finder_lib/exceptions.py new file mode 100644 index 0000000..6ebcf02 --- /dev/null +++ b/src/servicex_did_finder_lib/exceptions.py @@ -0,0 +1,52 @@ +# Copyright (c) 2025, IRIS-HEP +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# Define some exceptions to indicate various problems in DID finding + +class BaseDIDFinderException(Exception): + """ Base exception type """ + transient: bool = True + error_type: str = "" + + +class NoSuchDatasetException(BaseDIDFinderException): + """ The dataset does not exist in the catalog """ + transient = True + error_type = "does_not_exist" + + +class BadDatasetNameException(BaseDIDFinderException): + """ The specified dataset name is invalid """ + transient = False + error_type = "bad_name" + + +class LookupFailureException(BaseDIDFinderException): + """ There was a failure when looking up the dataset in the catalog """ + transient = True + error_type = "internal_failure" diff --git a/src/servicex_did_finder_lib/servicex_adaptor.py b/src/servicex_did_finder_lib/servicex_adaptor.py index eccc831..a3bd1b8 100644 --- a/src/servicex_did_finder_lib/servicex_adaptor.py +++ b/src/servicex_did_finder_lib/servicex_adaptor.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019, IRIS-HEP +# Copyright (c) 2019-2025, IRIS-HEP # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -74,6 +74,10 @@ def put_file_add_bulk(self, file_list, chunk_length=300): self.logger.error(f'After {attempts} tries, failed to send ServiceX App ' f'a put_file_bulk message: {mesg} - Ignoring error.') + def put_file_add(self, file): + # add one file + self.put_file_add_bulk([file]) + def put_fileset_complete(self, summary): success = False attempts = 0 @@ -88,3 +92,18 @@ def put_fileset_complete(self, summary): if not success: self.logger.error(f'After {attempts} tries, failed to send ServiceX App a put_file ' f'message: {str(summary)} - Ignoring error.') + + def put_fileset_error(self, summary): + success = False + attempts = 0 + while not success and attempts < MAX_RETRIES: + try: + requests.put(f"{self.endpoint}{self.dataset_id}/error", json=summary) + success = True + except requests.exceptions.ConnectionError: + self.logger.exception(f'Connection error to ServiceX App. Will retry ' + f'(try {attempts} out of {MAX_RETRIES}') + attempts += 1 + if not success: + self.logger.error(f'After {attempts} tries, failed to send ServiceX App a put_file ' + f'message: {str(summary)} - Ignoring error.') diff --git a/tests/servicex_did_finder_lib_tests/test_did_finder_app.py b/tests/servicex_did_finder_lib_tests/test_did_finder_app.py index 83f0d34..3f74b7b 100644 --- a/tests/servicex_did_finder_lib_tests/test_did_finder_app.py +++ b/tests/servicex_did_finder_lib_tests/test_did_finder_app.py @@ -1,4 +1,4 @@ -# Copyright (c) 2024, IRIS-HEP +# Copyright (c) 2024-2025, IRIS-HEP # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -31,6 +31,7 @@ from servicex_did_finder_lib.accumulator import Accumulator from servicex_did_finder_lib.did_finder_app import DIDFinderTask, DIDFinderApp +from servicex_did_finder_lib import exceptions @pytest.fixture() @@ -74,11 +75,15 @@ def test_did_finder_task(mocker, servicex, single_file_info): ) -def test_did_finder_task_exception(mocker, servicex, single_file_info): +@pytest.mark.parametrize("exc", [Exception("Boom"), + exceptions.BadDatasetNameException("Bad name"), + exceptions.LookupFailureException("Boom 2"), + exceptions.NoSuchDatasetException("Not there")]) +def test_did_finder_task_exception(mocker, servicex, exc, single_file_info): did_finder_task = DIDFinderTask() # did_finder_task.app = mocker.Mock() did_finder_task.app.did_finder_args = {} - mock_generator = mocker.Mock(side_effect=Exception("Boom")) + mock_generator = mocker.Mock(side_effect=exc) mock_accumulator = mocker.MagicMock(Accumulator) with patch( @@ -92,13 +97,14 @@ def test_did_finder_task_exception(mocker, servicex, single_file_info): mock_accumulator.add.assert_not_called() mock_accumulator.send_on.assert_not_called() - servicex.return_value.put_fileset_complete.assert_called_with( + error_type_str = (exc.error_type + if isinstance(exc, exceptions.BaseDIDFinderException) + else "internal_failure") + servicex.return_value.put_fileset_error.assert_called_with( { - "files": 0, # Aught to have a side effect in mock accumulator that updates this - "files-skipped": 0, - "total-events": 0, - "total-bytes": 0, "elapsed-time": 0, + "error-type": error_type_str, + "message": str(exc), } ) diff --git a/tests/servicex_did_finder_lib_tests/test_servicex_adaptor.py b/tests/servicex_did_finder_lib_tests/test_servicex_adaptor.py index 6b1f3fa..db8845f 100644 --- a/tests/servicex_did_finder_lib_tests/test_servicex_adaptor.py +++ b/tests/servicex_did_finder_lib_tests/test_servicex_adaptor.py @@ -1,3 +1,30 @@ +# Copyright (c) 2024-2025, IRIS-HEP +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import json import requests @@ -142,3 +169,49 @@ def request_callback(request): "elapsed-time": 10 }) assert len(responses.calls) == 3 # Max retries + + +@responses.activate +def test_put_file_error(): + call_count = 0 + + def request_callback(request): + nonlocal call_count + call_count += 1 + + if call_count == 1: + raise requests.exceptions.ConnectionError("Connection failed") + else: + return (206, {}, "") + + responses.add_callback(responses.PUT, + 'http://servicex.org/12345/error', + callback=request_callback) + + sx = ServiceXAdapter("http://servicex.org/", '12345') + sx.put_fileset_error({ + "error-type": "bad_name", + "elapsed-time": 10 + }) + assert len(responses.calls) == 1 + 1 # 1 retry + submitted = json.loads(responses.calls[0].request.body) + assert submitted['error-type'] == "bad_name" + assert submitted['elapsed-time'] == 10 + + +@responses.activate +def test_put_file_error_failure(): + + def request_callback(request): + raise requests.exceptions.ConnectionError("Connection failed") + + responses.add_callback(responses.PUT, + 'http://servicex.org/12345/error', + callback=request_callback) + + sx = ServiceXAdapter("http://servicex.org/", '12345') + sx.put_fileset_error({ + "error-type": "bad_name", + "elapsed-time": 10 + }) + assert len(responses.calls) == 3 # Max retries