From c28564c9bd9300ae95c90c70370fde82fb06e6e9 Mon Sep 17 00:00:00 2001 From: Alex Strick van Linschoten Date: Sat, 24 May 2025 19:31:44 +0200 Subject: [PATCH 01/12] Improve error message for duplicate pipeline run names MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When users run a pipeline with a fixed `run_name` in their config.yaml and then rerun the same pipeline, they would get a confusing database error about entity existence. This change catches both EntityExistsError and RuntimeError (with IntegrityError) specifically for duplicate run names and provides a much more helpful error message. ## Changes - Add improved error handling in `create_placeholder_run()` to catch duplicate run name errors (both EntityExistsError and raw SQL IntegrityError) - Provide actionable guidance with 3 specific solutions: 1. Change the run_name to a unique value 2. Use dynamic placeholders like `run_name: "my_run_{date}_{time}"` 3. Remove the run_name to auto-generate unique names - Add comprehensive unit tests to verify the improved error message - Update documentation in yaml_configuration.md to warn about run name uniqueness ## User Experience Instead of seeing confusing database errors, users now get: ``` Pipeline run name 'my_run_name' already exists in this project. Each pipeline run must have a unique name. To fix this, you can: 1. Change the 'run_name' in your config file to a unique value 2. Use a dynamic run name with placeholders like: run_name: "my_run_name_{date}_{time}" 3. Remove the 'run_name' from your config to auto-generate unique names For more information on run naming, see: https://docs.zenml.io/concepts/steps_and_pipelines/yaml_configuration#run-name ``` 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../steps-pipelines/yaml_configuration.md | 16 +++ src/zenml/pipelines/run_utils.py | 57 +++++++- tests/unit/pipelines/test_run_utils.py | 130 ++++++++++++++++++ 3 files changed, 201 insertions(+), 2 deletions(-) diff --git a/docs/book/how-to/steps-pipelines/yaml_configuration.md b/docs/book/how-to/steps-pipelines/yaml_configuration.md index e3f4d633a1a..b192a199936 100644 --- a/docs/book/how-to/steps-pipelines/yaml_configuration.md +++ b/docs/book/how-to/steps-pipelines/yaml_configuration.md @@ -116,6 +116,22 @@ Set a custom name for the pipeline run: run_name: "training_run_cifar10_resnet50_lr0.001" ``` +{% hint style="warning" %} +**Important:** Pipeline run names must be unique within a project. If you try to run a pipeline with a name that already exists, you'll get an error. To avoid this: + +1. **Use dynamic placeholders** to ensure uniqueness: + ```yaml + run_name: "training_run_{date}_{time}" + run_name: "training_run_cifar10_resnet50_lr0.001_{date}_{time}" + ``` + +2. **Don't specify a run_name** to let ZenML auto-generate unique names + +3. **Change the run_name** before rerunning the pipeline + +Available placeholders: `{date}`, `{time}`, and any parameters defined in your pipeline configuration. +{% endhint %} + ## Resource and Component Configuration ### Docker Settings diff --git a/src/zenml/pipelines/run_utils.py b/src/zenml/pipelines/run_utils.py index 990c2ac7dd5..4553ea6017b 100644 --- a/src/zenml/pipelines/run_utils.py +++ b/src/zenml/pipelines/run_utils.py @@ -12,6 +12,7 @@ from zenml.config.source import Source, SourceType from zenml.config.step_configurations import StepConfigurationUpdate from zenml.enums import ExecutionStatus +from zenml.exceptions import EntityExistsError from zenml.logger import get_logger from zenml.models import ( FlavorFilter, @@ -28,6 +29,11 @@ from zenml.utils.time_utils import utc_now from zenml.zen_stores.base_zen_store import BaseZenStore +try: + from sqlalchemy.exc import IntegrityError +except ImportError: + IntegrityError = None + if TYPE_CHECKING: StepConfigurationUpdateOrDict = Union[ Dict[str, Any], StepConfigurationUpdate @@ -63,6 +69,10 @@ def create_placeholder_run( Returns: The placeholder run or `None` if no run was created. + + Raises: + EntityExistsError: If a pipeline run with the same name already exists, + with an improved error message suggesting solutions. """ assert deployment.user @@ -91,8 +101,51 @@ def create_placeholder_run( tags=deployment.pipeline_configuration.tags, logs=logs, ) - run, _ = Client().zen_store.get_or_create_run(run_request) - return run + + try: + run, _ = Client().zen_store.get_or_create_run(run_request) + return run + except (EntityExistsError, Exception) as e: + # Handle both EntityExistsError and raw database IntegrityError + original_message = str(e) + + # Check for duplicate run name patterns in the error message + is_duplicate_run_name = False + run_name = run_request.name + + # Check for ZenML's EntityExistsError + if isinstance(e, EntityExistsError) and ( + "pipeline run" in original_message.lower() + and "name" in original_message.lower() + ): + is_duplicate_run_name = True + + # Check for raw SQL IntegrityError + elif ( + IntegrityError is not None + and isinstance(e.__cause__ or e, IntegrityError) + or "unique_run_name_in_project" in original_message + or ( + "duplicate entry" in original_message.lower() + and run_name in original_message + ) + ): + is_duplicate_run_name = True + + if is_duplicate_run_name: + improved_message = ( + f"Pipeline run name '{run_name}' already exists in this project. " + f"Each pipeline run must have a unique name.\n\n" + f"To fix this, you can:\n" + f"1. Change the 'run_name' in your config file to a unique value\n" + f'2. Use a dynamic run name with placeholders like: run_name: "{run_name}_{{date}}_{{time}}"\n' + f"3. Remove the 'run_name' from your config to auto-generate unique names\n\n" + f"For more information on run naming, see: https://docs.zenml.io/concepts/steps_and_pipelines/yaml_configuration#run-name" + ) + raise EntityExistsError(improved_message) from e + + # Re-raise the original error if it's not about duplicate run names + raise def get_placeholder_run( diff --git a/tests/unit/pipelines/test_run_utils.py b/tests/unit/pipelines/test_run_utils.py index 3e6ce0916f0..2d2bfd67f37 100644 --- a/tests/unit/pipelines/test_run_utils.py +++ b/tests/unit/pipelines/test_run_utils.py @@ -11,6 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. See the License for the specific language governing # permissions and limitations under the License. +from unittest.mock import MagicMock, patch +from uuid import uuid4 + +import pytest + +from zenml.exceptions import EntityExistsError +from zenml.models import ( + PipelineDeploymentResponse, + UserResponse, +) from zenml.pipelines import run_utils @@ -20,3 +30,123 @@ def test_default_run_name(): run_utils.get_default_run_name(pipeline_name="my_pipeline") == "my_pipeline-{date}-{time}" ) + + +@patch("zenml.pipelines.run_utils.Client") +def test_create_placeholder_run_duplicate_name_error(mock_client): + """Test that create_placeholder_run provides a helpful error message for duplicate run names.""" + # Mock the deployment + deployment = MagicMock(spec=PipelineDeploymentResponse) + deployment.user = MagicMock(spec=UserResponse) + deployment.schedule = None + deployment.run_name_template = "my_test_run" + deployment.pipeline_configuration.finalize_substitutions.return_value = {} + deployment.project.id = uuid4() + deployment.id = uuid4() + deployment.pipeline.id = uuid4() + deployment.pipeline_configuration.tags = [] + + # Mock the client and zen_store to raise EntityExistsError + original_error_message = ( + "Unable to create the requested pipeline run with name 'my_test_run': " + "Found another existing pipeline run with the same name in the 'test_project' project." + ) + mock_client.return_value.zen_store.get_or_create_run.side_effect = ( + EntityExistsError(original_error_message) + ) + + # Test that our improved error message is raised + with pytest.raises(EntityExistsError) as exc_info: + run_utils.create_placeholder_run(deployment) + + error_message = str(exc_info.value) + + # Verify the improved error message contains helpful guidance + assert ( + "Pipeline run name 'my_test_run' already exists in this project" + in error_message + ) + assert "Each pipeline run must have a unique name" in error_message + assert "Change the 'run_name' in your config" in error_message + assert "Use a dynamic run name with placeholders" in error_message + assert "Remove the 'run_name' from your config" in error_message + assert ( + "https://docs.zenml.io/concepts/steps_and_pipelines/yaml_configuration#run-name" + in error_message + ) + + +@patch("zenml.pipelines.run_utils.Client") +def test_create_placeholder_run_duplicate_name_runtime_error(mock_client): + """Test that create_placeholder_run handles RuntimeError with IntegrityError for duplicate run names.""" + # Mock the deployment + deployment = MagicMock(spec=PipelineDeploymentResponse) + deployment.user = MagicMock(spec=UserResponse) + deployment.schedule = None + deployment.run_name_template = "my_test_run" + deployment.pipeline_configuration.finalize_substitutions.return_value = {} + deployment.project.id = uuid4() + deployment.id = uuid4() + deployment.pipeline.id = uuid4() + deployment.pipeline_configuration.tags = [] + + # Mock the client and zen_store to raise RuntimeError with IntegrityError message + # This simulates what happens in the REST zen store + original_error_message = ( + '(pymysql.err.IntegrityError) (1062, "Duplicate entry ' + "'my_test_run-6e23c0466cc4411c8b9f75f0c8a1a818' for key " + "'pipeline_run.unique_run_name_in_project'\")" + ) + mock_client.return_value.zen_store.get_or_create_run.side_effect = ( + RuntimeError(original_error_message) + ) + + # Test that our improved error message is raised + with pytest.raises(EntityExistsError) as exc_info: + run_utils.create_placeholder_run(deployment) + + error_message = str(exc_info.value) + + # Verify the improved error message contains helpful guidance + assert ( + "Pipeline run name 'my_test_run' already exists in this project" + in error_message + ) + assert "Each pipeline run must have a unique name" in error_message + assert "Change the 'run_name' in your config file" in error_message + assert "Use a dynamic run name with placeholders" in error_message + assert "Remove the 'run_name' from your config" in error_message + assert ( + "https://docs.zenml.io/concepts/steps_and_pipelines/yaml_configuration#run-name" + in error_message + ) + + +@patch("zenml.pipelines.run_utils.Client") +def test_create_placeholder_run_non_duplicate_name_error(mock_client): + """Test that create_placeholder_run re-raises non-duplicate-name EntityExistsErrors unchanged.""" + # Mock the deployment + deployment = MagicMock(spec=PipelineDeploymentResponse) + deployment.user = MagicMock(spec=UserResponse) + deployment.schedule = None + deployment.run_name_template = "my_test_run" + deployment.pipeline_configuration.finalize_substitutions.return_value = {} + deployment.project.id = uuid4() + deployment.id = uuid4() + deployment.pipeline.id = uuid4() + deployment.pipeline_configuration.tags = [] + + # Mock the client and zen_store to raise a different EntityExistsError + original_error_message = "Some other entity exists error" + mock_client.return_value.zen_store.get_or_create_run.side_effect = ( + EntityExistsError(original_error_message) + ) + + # Test that the original error message is preserved for non-duplicate-name errors + with pytest.raises(EntityExistsError) as exc_info: + run_utils.create_placeholder_run(deployment) + + error_message = str(exc_info.value) + + # Verify the original error message is preserved + assert error_message == original_error_message From 0b903f755ea1e79961268173c970f1895eaffcf8 Mon Sep 17 00:00:00 2001 From: Alex Strick van Linschoten Date: Sat, 24 May 2025 22:16:24 +0200 Subject: [PATCH 02/12] Add more specific examples to run_name documentation As suggested in PR review, this commit adds clearer YAML comments to the run_name placeholder examples to make it more obvious what each example demonstrates. --- docs/book/how-to/steps-pipelines/yaml_configuration.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/book/how-to/steps-pipelines/yaml_configuration.md b/docs/book/how-to/steps-pipelines/yaml_configuration.md index b192a199936..be03ba7fcd3 100644 --- a/docs/book/how-to/steps-pipelines/yaml_configuration.md +++ b/docs/book/how-to/steps-pipelines/yaml_configuration.md @@ -121,7 +121,10 @@ run_name: "training_run_cifar10_resnet50_lr0.001" 1. **Use dynamic placeholders** to ensure uniqueness: ```yaml + # Example 1: Use placeholders for date and time to ensure uniqueness run_name: "training_run_{date}_{time}" + + # Example 2: Combine placeholders with specific details for better context run_name: "training_run_cifar10_resnet50_lr0.001_{date}_{time}" ``` From 659288c8dbe7da9f7d1e3902ac7069d512b156fc Mon Sep 17 00:00:00 2001 From: Alex Strick van Linschoten Date: Sat, 24 May 2025 22:21:13 +0200 Subject: [PATCH 03/12] Fix mypy type errors for IntegrityError import - Use TYPE_CHECKING to handle the optional sqlalchemy import properly - Rename to SQLIntegrityError to avoid confusion with other exceptions - This ensures mypy doesn't complain about assigning None to a type --- src/zenml/pipelines/run_utils.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/zenml/pipelines/run_utils.py b/src/zenml/pipelines/run_utils.py index 4553ea6017b..963eb81cf96 100644 --- a/src/zenml/pipelines/run_utils.py +++ b/src/zenml/pipelines/run_utils.py @@ -29,10 +29,13 @@ from zenml.utils.time_utils import utc_now from zenml.zen_stores.base_zen_store import BaseZenStore -try: - from sqlalchemy.exc import IntegrityError -except ImportError: - IntegrityError = None +if TYPE_CHECKING: + from sqlalchemy.exc import IntegrityError as SQLIntegrityError +else: + try: + from sqlalchemy.exc import IntegrityError as SQLIntegrityError + except ImportError: + SQLIntegrityError = None if TYPE_CHECKING: StepConfigurationUpdateOrDict = Union[ @@ -122,8 +125,8 @@ def create_placeholder_run( # Check for raw SQL IntegrityError elif ( - IntegrityError is not None - and isinstance(e.__cause__ or e, IntegrityError) + SQLIntegrityError is not None + and isinstance(e.__cause__ or e, SQLIntegrityError) or "unique_run_name_in_project" in original_message or ( "duplicate entry" in original_message.lower() From 3d2a22d02ed2cccde8ceed156b257e768d4de7fb Mon Sep 17 00:00:00 2001 From: Alex Strick van Linschoten Date: Sat, 24 May 2025 22:32:18 +0200 Subject: [PATCH 04/12] Address PR review comments for duplicate run name handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Change broad Exception catch to specific RuntimeError - Add parentheses for clarity in boolean logic - Align documentation wording with error message 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- docs/book/how-to/steps-pipelines/yaml_configuration.md | 2 +- src/zenml/pipelines/run_utils.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/book/how-to/steps-pipelines/yaml_configuration.md b/docs/book/how-to/steps-pipelines/yaml_configuration.md index be03ba7fcd3..84f05569ab7 100644 --- a/docs/book/how-to/steps-pipelines/yaml_configuration.md +++ b/docs/book/how-to/steps-pipelines/yaml_configuration.md @@ -128,7 +128,7 @@ run_name: "training_run_cifar10_resnet50_lr0.001" run_name: "training_run_cifar10_resnet50_lr0.001_{date}_{time}" ``` -2. **Don't specify a run_name** to let ZenML auto-generate unique names +2. **Remove the 'run_name' from your config** to let ZenML auto-generate unique names 3. **Change the run_name** before rerunning the pipeline diff --git a/src/zenml/pipelines/run_utils.py b/src/zenml/pipelines/run_utils.py index 963eb81cf96..41275808588 100644 --- a/src/zenml/pipelines/run_utils.py +++ b/src/zenml/pipelines/run_utils.py @@ -108,7 +108,7 @@ def create_placeholder_run( try: run, _ = Client().zen_store.get_or_create_run(run_request) return run - except (EntityExistsError, Exception) as e: + except (EntityExistsError, RuntimeError) as e: # Handle both EntityExistsError and raw database IntegrityError original_message = str(e) @@ -126,7 +126,7 @@ def create_placeholder_run( # Check for raw SQL IntegrityError elif ( SQLIntegrityError is not None - and isinstance(e.__cause__ or e, SQLIntegrityError) + and (isinstance(e.__cause__ or e, SQLIntegrityError)) or "unique_run_name_in_project" in original_message or ( "duplicate entry" in original_message.lower() From 23bced43265d973d730ffa4da3d3bed5ea23d9a5 Mon Sep 17 00:00:00 2001 From: Alex Strick van Linschoten Date: Mon, 26 May 2025 10:12:50 +0200 Subject: [PATCH 05/12] Move duplicate run name error handling to SQLZenStore MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Moved error handling from run_utils.py to sql_zen_store.py where it architecturally belongs - Database-specific error handling now stays in the database layer - Made error message more generic (removed specific mention of 'config file') - Simplified run_utils.py by removing 40+ lines of error handling code - Updated tests to reflect the new error handling location - All code paths that create runs now benefit from improved error messages 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- src/zenml/pipelines/run_utils.py | 55 +------------ src/zenml/zen_stores/sql_zen_store.py | 21 ++++- tests/unit/pipelines/test_run_utils.py | 110 ++++--------------------- 3 files changed, 36 insertions(+), 150 deletions(-) diff --git a/src/zenml/pipelines/run_utils.py b/src/zenml/pipelines/run_utils.py index 41275808588..b137d0213ab 100644 --- a/src/zenml/pipelines/run_utils.py +++ b/src/zenml/pipelines/run_utils.py @@ -12,7 +12,6 @@ from zenml.config.source import Source, SourceType from zenml.config.step_configurations import StepConfigurationUpdate from zenml.enums import ExecutionStatus -from zenml.exceptions import EntityExistsError from zenml.logger import get_logger from zenml.models import ( FlavorFilter, @@ -29,14 +28,6 @@ from zenml.utils.time_utils import utc_now from zenml.zen_stores.base_zen_store import BaseZenStore -if TYPE_CHECKING: - from sqlalchemy.exc import IntegrityError as SQLIntegrityError -else: - try: - from sqlalchemy.exc import IntegrityError as SQLIntegrityError - except ImportError: - SQLIntegrityError = None - if TYPE_CHECKING: StepConfigurationUpdateOrDict = Union[ Dict[str, Any], StepConfigurationUpdate @@ -105,50 +96,8 @@ def create_placeholder_run( logs=logs, ) - try: - run, _ = Client().zen_store.get_or_create_run(run_request) - return run - except (EntityExistsError, RuntimeError) as e: - # Handle both EntityExistsError and raw database IntegrityError - original_message = str(e) - - # Check for duplicate run name patterns in the error message - is_duplicate_run_name = False - run_name = run_request.name - - # Check for ZenML's EntityExistsError - if isinstance(e, EntityExistsError) and ( - "pipeline run" in original_message.lower() - and "name" in original_message.lower() - ): - is_duplicate_run_name = True - - # Check for raw SQL IntegrityError - elif ( - SQLIntegrityError is not None - and (isinstance(e.__cause__ or e, SQLIntegrityError)) - or "unique_run_name_in_project" in original_message - or ( - "duplicate entry" in original_message.lower() - and run_name in original_message - ) - ): - is_duplicate_run_name = True - - if is_duplicate_run_name: - improved_message = ( - f"Pipeline run name '{run_name}' already exists in this project. " - f"Each pipeline run must have a unique name.\n\n" - f"To fix this, you can:\n" - f"1. Change the 'run_name' in your config file to a unique value\n" - f'2. Use a dynamic run name with placeholders like: run_name: "{run_name}_{{date}}_{{time}}"\n' - f"3. Remove the 'run_name' from your config to auto-generate unique names\n\n" - f"For more information on run naming, see: https://docs.zenml.io/concepts/steps_and_pipelines/yaml_configuration#run-name" - ) - raise EntityExistsError(improved_message) from e - - # Re-raise the original error if it's not about duplicate run names - raise + run, _ = Client().zen_store.get_or_create_run(run_request) + return run def get_placeholder_run( diff --git a/src/zenml/zen_stores/sql_zen_store.py b/src/zenml/zen_stores/sql_zen_store.py index c39d1477b55..187e5468b24 100644 --- a/src/zenml/zen_stores/sql_zen_store.py +++ b/src/zenml/zen_stores/sql_zen_store.py @@ -5107,10 +5107,29 @@ def _create_run( try: session.commit() - except IntegrityError: + except IntegrityError as e: # We have to rollback the failed session first in order to # continue using it session.rollback() + + # Check if this is a duplicate run name error + error_message = str(e).lower() + if "unique_run_name_in_project" in error_message or ( + "duplicate entry" in error_message + and pipeline_run.name in str(e) + ): + # Provide a user-friendly error message for duplicate run names + improved_message = ( + f"Pipeline run name '{pipeline_run.name}' already exists in this project. " + f"Each pipeline run must have a unique name.\n\n" + f"To fix this, you can:\n" + f"1. Use a different run name\n" + f'2. Use a dynamic run name with placeholders like: "{pipeline_run.name}_{{date}}_{{time}}"\n' + f"3. Remove the run name from your configuration to auto-generate unique names\n\n" + f"For more information on run naming, see: https://docs.zenml.io/concepts/steps_and_pipelines/yaml_configuration#run-name" + ) + raise EntityExistsError(improved_message) from e + # This can fail if the name is taken by a different run self._verify_name_uniqueness( resource=pipeline_run, diff --git a/tests/unit/pipelines/test_run_utils.py b/tests/unit/pipelines/test_run_utils.py index 2d2bfd67f37..b988ec3e353 100644 --- a/tests/unit/pipelines/test_run_utils.py +++ b/tests/unit/pipelines/test_run_utils.py @@ -34,7 +34,7 @@ def test_default_run_name(): @patch("zenml.pipelines.run_utils.Client") def test_create_placeholder_run_duplicate_name_error(mock_client): - """Test that create_placeholder_run provides a helpful error message for duplicate run names.""" + """Test that create_placeholder_run passes through improved error message from zen_store.""" # Mock the deployment deployment = MagicMock(spec=PipelineDeploymentResponse) deployment.user = MagicMock(spec=UserResponse) @@ -46,107 +46,25 @@ def test_create_placeholder_run_duplicate_name_error(mock_client): deployment.pipeline.id = uuid4() deployment.pipeline_configuration.tags = [] - # Mock the client and zen_store to raise EntityExistsError - original_error_message = ( - "Unable to create the requested pipeline run with name 'my_test_run': " - "Found another existing pipeline run with the same name in the 'test_project' project." + # Mock the client and zen_store to raise EntityExistsError with improved message + improved_error_message = ( + "Pipeline run name 'my_test_run' already exists in this project. " + "Each pipeline run must have a unique name.\n\n" + "To fix this, you can:\n" + "1. Use a different run name\n" + '2. Use a dynamic run name with placeholders like: "my_test_run_{date}_{time}"\n' + "3. Remove the run name from your configuration to auto-generate unique names\n\n" + "For more information on run naming, see: https://docs.zenml.io/concepts/steps_and_pipelines/yaml_configuration#run-name" ) mock_client.return_value.zen_store.get_or_create_run.side_effect = ( - EntityExistsError(original_error_message) + EntityExistsError(improved_error_message) ) - # Test that our improved error message is raised + # Test that the error from zen_store is passed through unchanged with pytest.raises(EntityExistsError) as exc_info: run_utils.create_placeholder_run(deployment) error_message = str(exc_info.value) - # Verify the improved error message contains helpful guidance - assert ( - "Pipeline run name 'my_test_run' already exists in this project" - in error_message - ) - assert "Each pipeline run must have a unique name" in error_message - assert "Change the 'run_name' in your config" in error_message - assert "Use a dynamic run name with placeholders" in error_message - assert "Remove the 'run_name' from your config" in error_message - assert ( - "https://docs.zenml.io/concepts/steps_and_pipelines/yaml_configuration#run-name" - in error_message - ) - - -@patch("zenml.pipelines.run_utils.Client") -def test_create_placeholder_run_duplicate_name_runtime_error(mock_client): - """Test that create_placeholder_run handles RuntimeError with IntegrityError for duplicate run names.""" - # Mock the deployment - deployment = MagicMock(spec=PipelineDeploymentResponse) - deployment.user = MagicMock(spec=UserResponse) - deployment.schedule = None - deployment.run_name_template = "my_test_run" - deployment.pipeline_configuration.finalize_substitutions.return_value = {} - deployment.project.id = uuid4() - deployment.id = uuid4() - deployment.pipeline.id = uuid4() - deployment.pipeline_configuration.tags = [] - - # Mock the client and zen_store to raise RuntimeError with IntegrityError message - # This simulates what happens in the REST zen store - original_error_message = ( - '(pymysql.err.IntegrityError) (1062, "Duplicate entry ' - "'my_test_run-6e23c0466cc4411c8b9f75f0c8a1a818' for key " - "'pipeline_run.unique_run_name_in_project'\")" - ) - mock_client.return_value.zen_store.get_or_create_run.side_effect = ( - RuntimeError(original_error_message) - ) - - # Test that our improved error message is raised - with pytest.raises(EntityExistsError) as exc_info: - run_utils.create_placeholder_run(deployment) - - error_message = str(exc_info.value) - - # Verify the improved error message contains helpful guidance - assert ( - "Pipeline run name 'my_test_run' already exists in this project" - in error_message - ) - assert "Each pipeline run must have a unique name" in error_message - assert "Change the 'run_name' in your config file" in error_message - assert "Use a dynamic run name with placeholders" in error_message - assert "Remove the 'run_name' from your config" in error_message - assert ( - "https://docs.zenml.io/concepts/steps_and_pipelines/yaml_configuration#run-name" - in error_message - ) - - -@patch("zenml.pipelines.run_utils.Client") -def test_create_placeholder_run_non_duplicate_name_error(mock_client): - """Test that create_placeholder_run re-raises non-duplicate-name EntityExistsErrors unchanged.""" - # Mock the deployment - deployment = MagicMock(spec=PipelineDeploymentResponse) - deployment.user = MagicMock(spec=UserResponse) - deployment.schedule = None - deployment.run_name_template = "my_test_run" - deployment.pipeline_configuration.finalize_substitutions.return_value = {} - deployment.project.id = uuid4() - deployment.id = uuid4() - deployment.pipeline.id = uuid4() - deployment.pipeline_configuration.tags = [] - - # Mock the client and zen_store to raise a different EntityExistsError - original_error_message = "Some other entity exists error" - mock_client.return_value.zen_store.get_or_create_run.side_effect = ( - EntityExistsError(original_error_message) - ) - - # Test that the original error message is preserved for non-duplicate-name errors - with pytest.raises(EntityExistsError) as exc_info: - run_utils.create_placeholder_run(deployment) - - error_message = str(exc_info.value) - - # Verify the original error message is preserved - assert error_message == original_error_message + # Verify the error message is exactly what zen_store raised + assert error_message == improved_error_message From 6dc98fe9fb77bc0c7f8f09f0d39b6e3950027015 Mon Sep 17 00:00:00 2001 From: Alex Strick van Linschoten Date: Mon, 26 May 2025 12:59:41 +0200 Subject: [PATCH 06/12] Fix docstring linter error in create_placeholder_run MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove EntityExistsError from Raises section since this function no longer explicitly raises exceptions - they are now handled in SQLZenStore. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- src/zenml/pipelines/run_utils.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/zenml/pipelines/run_utils.py b/src/zenml/pipelines/run_utils.py index b137d0213ab..0710cd59af9 100644 --- a/src/zenml/pipelines/run_utils.py +++ b/src/zenml/pipelines/run_utils.py @@ -63,10 +63,6 @@ def create_placeholder_run( Returns: The placeholder run or `None` if no run was created. - - Raises: - EntityExistsError: If a pipeline run with the same name already exists, - with an improved error message suggesting solutions. """ assert deployment.user From 0e54848cac8204e82232dd4e3194eecac56efc09 Mon Sep 17 00:00:00 2001 From: Alex Strick van Linschoten Date: Wed, 25 Jun 2025 14:44:04 +0200 Subject: [PATCH 07/12] Replace mocked test with integration test for duplicate pipeline runs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously the test only verified that mocking worked correctly. This adds a proper integration test that actually runs pipelines twice with the same name to verify the duplicate name detection behavior. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../functional/pipelines/test_pipeline_run.py | 37 ++++++++++++++ tests/unit/pipelines/test_run_utils.py | 48 ------------------- 2 files changed, 37 insertions(+), 48 deletions(-) diff --git a/tests/integration/functional/pipelines/test_pipeline_run.py b/tests/integration/functional/pipelines/test_pipeline_run.py index 505183ab16b..ff72f95d877 100644 --- a/tests/integration/functional/pipelines/test_pipeline_run.py +++ b/tests/integration/functional/pipelines/test_pipeline_run.py @@ -1,8 +1,12 @@ import os from unittest.mock import patch +import pytest +from sqlalchemy.exc import IntegrityError + from zenml import pipeline, step from zenml.constants import ENV_ZENML_PREVENT_CLIENT_SIDE_CACHING +from zenml.exceptions import EntityExistsError @step(enable_cache=False) @@ -29,6 +33,11 @@ def noop() -> None: pass +@step(enable_cache=False) +def simple_step_for_duplicate_test() -> int: + return 42 + + def test_pipeline_run_computes_clientside_cache(clean_client, mocker): """Tests that running a pipeline computes the cached steps client-side and only forwards the non-cached steps to the orchestrator. @@ -104,3 +113,31 @@ def full_cached_pipeline(): full_cached_pipeline() mock_submit_pipeline.assert_called() + + +def test_duplicate_pipeline_run_name_raises_improved_error(clean_client): + """Test that running a pipeline twice with the same name raises an improved error message.""" + + @pipeline + def test_pipeline(): + simple_step_for_duplicate_test() + + # First run should succeed + run_name = "duplicate_name_test_run" + first_run = test_pipeline.with_options(run_name=run_name)() + assert first_run.name == run_name + + # Second run with same name should raise an error + # This tests the actual behavior - currently raises IntegrityError at DB level + # When improved error handling is implemented, this can be updated to expect EntityExistsError + with pytest.raises((EntityExistsError, IntegrityError)) as exc_info: + test_pipeline.with_options(run_name=run_name)() + + error_message = str(exc_info.value) + + # Verify it's a duplicate name error (either improved or raw database error) + assert ( + "already exists" in error_message.lower() + or "unique constraint failed" in error_message.lower() + or f"Pipeline run name '{run_name}' already exists" in error_message + ) diff --git a/tests/unit/pipelines/test_run_utils.py b/tests/unit/pipelines/test_run_utils.py index b988ec3e353..3e6ce0916f0 100644 --- a/tests/unit/pipelines/test_run_utils.py +++ b/tests/unit/pipelines/test_run_utils.py @@ -11,16 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. See the License for the specific language governing # permissions and limitations under the License. -from unittest.mock import MagicMock, patch -from uuid import uuid4 - -import pytest - -from zenml.exceptions import EntityExistsError -from zenml.models import ( - PipelineDeploymentResponse, - UserResponse, -) from zenml.pipelines import run_utils @@ -30,41 +20,3 @@ def test_default_run_name(): run_utils.get_default_run_name(pipeline_name="my_pipeline") == "my_pipeline-{date}-{time}" ) - - -@patch("zenml.pipelines.run_utils.Client") -def test_create_placeholder_run_duplicate_name_error(mock_client): - """Test that create_placeholder_run passes through improved error message from zen_store.""" - # Mock the deployment - deployment = MagicMock(spec=PipelineDeploymentResponse) - deployment.user = MagicMock(spec=UserResponse) - deployment.schedule = None - deployment.run_name_template = "my_test_run" - deployment.pipeline_configuration.finalize_substitutions.return_value = {} - deployment.project.id = uuid4() - deployment.id = uuid4() - deployment.pipeline.id = uuid4() - deployment.pipeline_configuration.tags = [] - - # Mock the client and zen_store to raise EntityExistsError with improved message - improved_error_message = ( - "Pipeline run name 'my_test_run' already exists in this project. " - "Each pipeline run must have a unique name.\n\n" - "To fix this, you can:\n" - "1. Use a different run name\n" - '2. Use a dynamic run name with placeholders like: "my_test_run_{date}_{time}"\n' - "3. Remove the run name from your configuration to auto-generate unique names\n\n" - "For more information on run naming, see: https://docs.zenml.io/concepts/steps_and_pipelines/yaml_configuration#run-name" - ) - mock_client.return_value.zen_store.get_or_create_run.side_effect = ( - EntityExistsError(improved_error_message) - ) - - # Test that the error from zen_store is passed through unchanged - with pytest.raises(EntityExistsError) as exc_info: - run_utils.create_placeholder_run(deployment) - - error_message = str(exc_info.value) - - # Verify the error message is exactly what zen_store raised - assert error_message == improved_error_message From b86a00029c06f9a912d573fabec6f7c7855ebc88 Mon Sep 17 00:00:00 2001 From: Alex Strick van Linschoten Date: Wed, 25 Jun 2025 15:07:07 +0200 Subject: [PATCH 08/12] Fix SQLAlchemy autoflush issue in pipeline run creation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wrap logs processing in session.no_autoflush to prevent premature IntegrityError during _get_reference_schema_by_id calls. This ensures duplicate name errors are properly caught by the try/except block and converted to helpful EntityExistsError messages. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- src/zenml/zen_stores/sql_zen_store.py | 27 ++++++++++--------- .../functional/pipelines/test_pipeline_run.py | 6 ++++- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/src/zenml/zen_stores/sql_zen_store.py b/src/zenml/zen_stores/sql_zen_store.py index c47b6f2e5d7..945b1373e59 100644 --- a/src/zenml/zen_stores/sql_zen_store.py +++ b/src/zenml/zen_stores/sql_zen_store.py @@ -5696,20 +5696,21 @@ def _create_run( # Add logs entry for the run if exists if pipeline_run.logs is not None: - self._get_reference_schema_by_id( - resource=pipeline_run, - reference_schema=StackComponentSchema, - reference_id=pipeline_run.logs.artifact_store_id, - session=session, - reference_type="logs artifact store", - ) + with session.no_autoflush: + self._get_reference_schema_by_id( + resource=pipeline_run, + reference_schema=StackComponentSchema, + reference_id=pipeline_run.logs.artifact_store_id, + session=session, + reference_type="logs artifact store", + ) - log_entry = LogsSchema( - uri=pipeline_run.logs.uri, - pipeline_run_id=new_run.id, - artifact_store_id=pipeline_run.logs.artifact_store_id, - ) - session.add(log_entry) + log_entry = LogsSchema( + uri=pipeline_run.logs.uri, + pipeline_run_id=new_run.id, + artifact_store_id=pipeline_run.logs.artifact_store_id, + ) + session.add(log_entry) try: session.commit() diff --git a/tests/integration/functional/pipelines/test_pipeline_run.py b/tests/integration/functional/pipelines/test_pipeline_run.py index ff72f95d877..304f0395086 100644 --- a/tests/integration/functional/pipelines/test_pipeline_run.py +++ b/tests/integration/functional/pipelines/test_pipeline_run.py @@ -136,8 +136,12 @@ def test_pipeline(): error_message = str(exc_info.value) # Verify it's a duplicate name error (either improved or raw database error) + # The message should now come from _verify_name_uniqueness after our session.no_autoflush fix assert ( "already exists" in error_message.lower() - or "unique constraint failed" in error_message.lower() + or "existing pipeline run with the same name" in error_message.lower() or f"Pipeline run name '{run_name}' already exists" in error_message ) + + # Verify it's an EntityExistsError, not a raw IntegrityError + assert isinstance(exc_info.value, EntityExistsError) From e86c84a37da5dd9957d313850ba1897328b12312 Mon Sep 17 00:00:00 2001 From: Alex Strick van Linschoten Date: Wed, 25 Jun 2025 15:11:03 +0200 Subject: [PATCH 09/12] Clean up outdated comments in integration test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove "when implemented" and "should now" comments since the improved error handling is already working. Simplify test to only expect EntityExistsError now that the fix is in place. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../functional/pipelines/test_pipeline_run.py | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/tests/integration/functional/pipelines/test_pipeline_run.py b/tests/integration/functional/pipelines/test_pipeline_run.py index 304f0395086..62bb762ce76 100644 --- a/tests/integration/functional/pipelines/test_pipeline_run.py +++ b/tests/integration/functional/pipelines/test_pipeline_run.py @@ -2,7 +2,6 @@ from unittest.mock import patch import pytest -from sqlalchemy.exc import IntegrityError from zenml import pipeline, step from zenml.constants import ENV_ZENML_PREVENT_CLIENT_SIDE_CACHING @@ -127,21 +126,15 @@ def test_pipeline(): first_run = test_pipeline.with_options(run_name=run_name)() assert first_run.name == run_name - # Second run with same name should raise an error - # This tests the actual behavior - currently raises IntegrityError at DB level - # When improved error handling is implemented, this can be updated to expect EntityExistsError - with pytest.raises((EntityExistsError, IntegrityError)) as exc_info: + # Second run with same name should raise EntityExistsError with clear message + with pytest.raises(EntityExistsError) as exc_info: test_pipeline.with_options(run_name=run_name)() error_message = str(exc_info.value) - # Verify it's a duplicate name error (either improved or raw database error) - # The message should now come from _verify_name_uniqueness after our session.no_autoflush fix + # Verify it contains a clear duplicate name error message assert ( "already exists" in error_message.lower() or "existing pipeline run with the same name" in error_message.lower() or f"Pipeline run name '{run_name}' already exists" in error_message ) - - # Verify it's an EntityExistsError, not a raw IntegrityError - assert isinstance(exc_info.value, EntityExistsError) From 4ed09300a097302630bb620c4bc9679b86e8ea12 Mon Sep 17 00:00:00 2001 From: Alex Strick van Linschoten Date: Thu, 3 Jul 2025 17:05:27 +0200 Subject: [PATCH 10/12] Refactor duplicate pipeline run name error handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move user-friendly error message logic from _create_run to get_or_create_run method. This removes fragile database-specific error message parsing and follows the established pattern where get_or_create_run handles user-facing errors. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- src/zenml/zen_stores/sql_zen_store.py | 49 ++++++++++++++------------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/src/zenml/zen_stores/sql_zen_store.py b/src/zenml/zen_stores/sql_zen_store.py index 62afa15c5a5..0242968cfa7 100644 --- a/src/zenml/zen_stores/sql_zen_store.py +++ b/src/zenml/zen_stores/sql_zen_store.py @@ -5718,29 +5718,11 @@ def _create_run( try: session.commit() - except IntegrityError as e: + except IntegrityError: # We have to rollback the failed session first in order to # continue using it session.rollback() - # Check if this is a duplicate run name error - error_message = str(e).lower() - if "unique_run_name_in_project" in error_message or ( - "duplicate entry" in error_message - and pipeline_run.name in str(e) - ): - # Provide a user-friendly error message for duplicate run names - improved_message = ( - f"Pipeline run name '{pipeline_run.name}' already exists in this project. " - f"Each pipeline run must have a unique name.\n\n" - f"To fix this, you can:\n" - f"1. Use a different run name\n" - f'2. Use a dynamic run name with placeholders like: "{pipeline_run.name}_{{date}}_{{time}}"\n' - f"3. Remove the run name from your configuration to auto-generate unique names\n\n" - f"For more information on run naming, see: https://docs.zenml.io/concepts/steps_and_pipelines/yaml_configuration#run-name" - ) - raise EntityExistsError(improved_message) from e - # This can fail if the name is taken by a different run self._verify_name_uniqueness( resource=pipeline_run, @@ -6040,7 +6022,18 @@ def get_or_create_run( return self._create_run(pipeline_run, session=session), True except EntityExistsError as create_error: if not pipeline_run.orchestrator_run_id: - raise + # No orchestrator_run_id means this is likely a name conflict. + # Provide a user-friendly error message for duplicate run names. + improved_message = ( + f"Pipeline run name '{pipeline_run.name}' already exists in this project. " + f"Each pipeline run must have a unique name.\n\n" + f"To fix this, you can:\n" + f"1. Use a different run name\n" + f'2. Use a dynamic run name with placeholders like: "{pipeline_run.name}_{{date}}_{{time}}"\n' + f"3. Remove the run name from your configuration to auto-generate unique names\n\n" + f"For more information on run naming, see: https://docs.zenml.io/concepts/steps_and_pipelines/yaml_configuration#run-name" + ) + raise EntityExistsError(improved_message) from create_error # Creating the run failed because # - a run with the same deployment_id and orchestrator_run_id # exists. We now fetch and return that run. @@ -6059,10 +6052,18 @@ def get_or_create_run( ) except KeyError: # We should only get here if the run creation failed because - # of a name conflict. We raise the error that happened - # during creation in any case to forward the error message - # to the user. - raise create_error + # of a name conflict. Provide a user-friendly error message + # for duplicate run names. + improved_message = ( + f"Pipeline run name '{pipeline_run.name}' already exists in this project. " + f"Each pipeline run must have a unique name.\n\n" + f"To fix this, you can:\n" + f"1. Use a different run name\n" + f'2. Use a dynamic run name with placeholders like: "{pipeline_run.name}_{{date}}_{{time}}"\n' + f"3. Remove the run name from your configuration to auto-generate unique names\n\n" + f"For more information on run naming, see: https://docs.zenml.io/concepts/steps_and_pipelines/yaml_configuration#run-name" + ) + raise EntityExistsError(improved_message) from create_error def list_runs( self, From 4bd95f2347e05b8dbeb0761d285f1d2d4a0e3ed6 Mon Sep 17 00:00:00 2001 From: Alex Strick van Linschoten Date: Thu, 3 Jul 2025 17:16:00 +0200 Subject: [PATCH 11/12] Extract duplicate error message logic into helper method MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove code duplication by creating _get_duplicate_run_name_error_message helper method that generates the user-friendly error message for duplicate pipeline run names. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- src/zenml/zen_stores/sql_zen_store.py | 41 ++++++++++++++++++--------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/src/zenml/zen_stores/sql_zen_store.py b/src/zenml/zen_stores/sql_zen_store.py index 0242968cfa7..df4a8ca2624 100644 --- a/src/zenml/zen_stores/sql_zen_store.py +++ b/src/zenml/zen_stores/sql_zen_store.py @@ -5664,6 +5664,27 @@ def _get_regular_output_artifact_node( pipeline_run_id=pipeline_run_id, status=ExecutionStatus(run.status) ) + def _get_duplicate_run_name_error_message( + self, pipeline_run_name: str + ) -> str: + """Generate a user-friendly error message for duplicate pipeline run names. + + Args: + pipeline_run_name: The name of the pipeline run that already exists. + + Returns: + A formatted error message with helpful suggestions. + """ + return ( + f"Pipeline run name '{pipeline_run_name}' already exists in this project. " + f"Each pipeline run must have a unique name.\n\n" + f"To fix this, you can:\n" + f"1. Use a different run name\n" + f'2. Use a dynamic run name with placeholders like: "{pipeline_run_name}_{{date}}_{{time}}"\n' + f"3. Remove the run name from your configuration to auto-generate unique names\n\n" + f"For more information on run naming, see: https://docs.zenml.io/concepts/steps_and_pipelines/yaml_configuration#run-name" + ) + def _create_run( self, pipeline_run: PipelineRunRequest, session: Session ) -> PipelineRunResponse: @@ -6025,13 +6046,9 @@ def get_or_create_run( # No orchestrator_run_id means this is likely a name conflict. # Provide a user-friendly error message for duplicate run names. improved_message = ( - f"Pipeline run name '{pipeline_run.name}' already exists in this project. " - f"Each pipeline run must have a unique name.\n\n" - f"To fix this, you can:\n" - f"1. Use a different run name\n" - f'2. Use a dynamic run name with placeholders like: "{pipeline_run.name}_{{date}}_{{time}}"\n' - f"3. Remove the run name from your configuration to auto-generate unique names\n\n" - f"For more information on run naming, see: https://docs.zenml.io/concepts/steps_and_pipelines/yaml_configuration#run-name" + self._get_duplicate_run_name_error_message( + pipeline_run.name + ) ) raise EntityExistsError(improved_message) from create_error # Creating the run failed because @@ -6055,13 +6072,9 @@ def get_or_create_run( # of a name conflict. Provide a user-friendly error message # for duplicate run names. improved_message = ( - f"Pipeline run name '{pipeline_run.name}' already exists in this project. " - f"Each pipeline run must have a unique name.\n\n" - f"To fix this, you can:\n" - f"1. Use a different run name\n" - f'2. Use a dynamic run name with placeholders like: "{pipeline_run.name}_{{date}}_{{time}}"\n' - f"3. Remove the run name from your configuration to auto-generate unique names\n\n" - f"For more information on run naming, see: https://docs.zenml.io/concepts/steps_and_pipelines/yaml_configuration#run-name" + self._get_duplicate_run_name_error_message( + pipeline_run.name + ) ) raise EntityExistsError(improved_message) from create_error From 7b05e07589c5cda7346aaec62dda1b8005e686ab Mon Sep 17 00:00:00 2001 From: Alex Strick van Linschoten Date: Thu, 3 Jul 2025 17:16:43 +0200 Subject: [PATCH 12/12] Update src/zenml/pipelines/run_utils.py --- src/zenml/pipelines/run_utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/zenml/pipelines/run_utils.py b/src/zenml/pipelines/run_utils.py index 22572816c33..27e33c7b64c 100644 --- a/src/zenml/pipelines/run_utils.py +++ b/src/zenml/pipelines/run_utils.py @@ -87,7 +87,6 @@ def create_placeholder_run( tags=deployment.pipeline_configuration.tags, logs=logs, ) - run, _ = Client().zen_store.get_or_create_run(run_request) return run