Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions docs/v3/how-to-guides/deployments/prefect-yaml.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,31 @@ Each time a step runs, the following actions take place in order:

To update a deployment, make any desired changes to the `prefect.yaml` file, and run `prefect deploy`. Running just this command will prompt you to select a deployment interactively, or you may specify the deployment to update with `--name your-deployment`.

### Rename a deployment

If you need to rename a deployment, use the `replaces` field to indicate which existing deployment should be updated:

```yaml
deployments:
- name: send-emails-v2 # The new name
replaces: send-emails # The old name being replaced
entrypoint: flows.py:send_email
work_pool:
name: my-work-pool
```

When you run `prefect deploy`:
- The existing deployment named `send-emails` is updated in place
- Its name changes to `send-emails-v2`
- No new deployment is created, and no orphaned deployment remains
- All run history stays attached because the deployment's ID is preserved

After a successful deploy, you can remove the `replaces` field from your YAML — it becomes a no-op once the old name no longer exists.

<Note>
If `replaces` points to a name that doesn't exist, the deployment is created normally and a warning is logged.
</Note>

## Further reading

Now that you are familiar with creating deployments, you can explore infrastructure options for running your deployments:
Expand Down
2 changes: 2 additions & 0 deletions src/prefect/cli/deploy/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class DeploymentConfig(BaseModel):
triggers: Optional[List[Dict[str, Any]]] = None
sla: Optional[List[SlaTypes]] = None

replaces: Optional[str] = None # The name of an existing deployment to replace


class PrefectYamlModel(BaseModel):
model_config = ConfigDict(populate_by_name=True, extra="ignore")
Expand Down
6 changes: 6 additions & 0 deletions src/prefect/client/orchestration/_deployments/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def create_deployment(
branch: str | None = None,
base: UUID | None = None,
root: UUID | None = None,
replaces: str | None = None,
) -> UUID:
"""
Create a deployment.
Expand Down Expand Up @@ -123,6 +124,7 @@ def create_deployment(
branch=branch,
base=base,
root=root,
replaces=replaces,
)

if work_pool_name is not None:
Expand All @@ -146,6 +148,7 @@ def create_deployment(
"branch",
"base",
"root",
"replaces",
]

for field in exclude_if_none:
Expand Down Expand Up @@ -779,6 +782,7 @@ async def create_deployment(
branch: str | None = None,
base: UUID | None = None,
root: UUID | None = None,
replaces: str | None = None,
) -> UUID:
"""
Create a deployment.
Expand Down Expand Up @@ -830,6 +834,7 @@ async def create_deployment(
branch=branch,
base=base,
root=root,
replaces=replaces,
)

if work_pool_name is not None:
Expand All @@ -853,6 +858,7 @@ async def create_deployment(
"branch",
"base",
"root",
"replaces",
]

for field in exclude_if_none:
Expand Down
5 changes: 5 additions & 0 deletions src/prefect/client/schemas/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,11 @@ def convert_to_strings(
default=None, description="The root deployment of the deployment."
)

replaces: Optional[str] = Field(
default=None,
description="The name of an existing deployment that this deployment replaces. Used for renaming deployments.",
)

def check_valid_configuration(self, base_job_template: dict[str, Any]) -> None:
"""Check that the combination of base_job_template defaults
and job_variables conforms to the specified schema.
Expand Down
3 changes: 2 additions & 1 deletion src/prefect/server/api/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ async def create_deployment(
)
deployment_dict["work_queue_id"] = work_queue.id

replaces = deployment_dict.pop("replaces", None)
deployment = schemas.core.Deployment(**deployment_dict)
# check to see if relevant blocks exist, allowing us throw a useful error message
# for debugging
Expand Down Expand Up @@ -199,7 +200,7 @@ async def create_deployment(

right_now = now("UTC")
model = await models.deployments.create_deployment(
session=session, deployment=deployment
session=session, deployment=deployment, replaces=replaces
)

if model.created >= right_now:
Expand Down
73 changes: 54 additions & 19 deletions src/prefect/server/models/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ async def create_deployment(
db: PrefectDBInterface,
session: AsyncSession,
deployment: schemas.core.Deployment | schemas.actions.DeploymentCreate,
replaces: str | None = None,
) -> Optional[orm_models.Deployment]:
"""Upserts a deployment.

Expand All @@ -128,12 +129,35 @@ async def create_deployment(
# even under concurrent upserts for the same (flow_id, name).
upsert_start = now("UTC")

# Snapshot existing deployment field values before upsert for change detection
# If `replaces` is set, look up the deployment to be renamed so we can
# update it in-place (preserving its ID, run history, etc.) rather than
# creating a new row.
replaces_deployment_id: UUID | None = None
if replaces:
replaces_id_result = await session.execute(
sa.select(db.Deployment.id).where(
sa.and_(
db.Deployment.flow_id == deployment.flow_id,
db.Deployment.name == replaces,
)
)
)
replaces_deployment_id = replaces_id_result.scalar_one_or_none()
if replaces_deployment_id is None:
logger.warning(
f"Deployment '{deployment.name}' has 'replaces: {replaces}' "
f"but no deployment with that name exists for this flow. "
f"Creating new deployment."
)

# Snapshot existing deployment field values before upsert for change detection.
# When using `replaces`, snapshot the deployment being renamed.
lookup_name = replaces if replaces_deployment_id else deployment.name
existing_result = await session.execute(
sa.select(db.Deployment).where(
sa.and_(
db.Deployment.flow_id == deployment.flow_id,
db.Deployment.name == deployment.name,
db.Deployment.name == lookup_name,
)
)
)
Expand All @@ -156,7 +180,7 @@ async def create_deployment(

schedules = deployment.schedules
insert_values = deployment.model_dump_for_orm(
exclude_unset=True, exclude={"schedules", "version_info"}
exclude_unset=True, exclude={"schedules", "version_info", "replaces"}
)

requested_concurrency_limit = insert_values.pop("concurrency_limit", "unset")
Expand All @@ -177,32 +201,43 @@ async def create_deployment(
"job_variables",
"concurrency_limit",
"version_info",
"replaces",
},
)
if job_variables:
conflict_update_fields["infra_overrides"] = job_variables

insert_stmt = (
db.queries.insert(db.Deployment)
.values(**insert_values)
.on_conflict_do_update(
index_elements=db.orm.deployment_unique_upsert_columns,
set_={**conflict_update_fields},
if replaces_deployment_id:
# Rename in-place: update the existing deployment row rather than upserting.
# All run history, schedules, and concurrency limits remain attached via the ID.
await session.execute(
sa.update(db.Deployment)
.where(db.Deployment.id == replaces_deployment_id)
.values(**conflict_update_fields)
)
deployment_id = replaces_deployment_id
else:
insert_stmt = (
db.queries.insert(db.Deployment)
.values(**insert_values)
.on_conflict_do_update(
index_elements=db.orm.deployment_unique_upsert_columns,
set_={**conflict_update_fields},
)
)
)

await session.execute(insert_stmt)
await session.execute(insert_stmt)

# Get the id of the deployment we just created or updated
result = await session.execute(
sa.select(db.Deployment.id).where(
sa.and_(
db.Deployment.flow_id == deployment.flow_id,
db.Deployment.name == deployment.name,
# Get the id of the deployment we just created or updated
result = await session.execute(
sa.select(db.Deployment.id).where(
sa.and_(
db.Deployment.flow_id == deployment.flow_id,
db.Deployment.name == deployment.name,
)
)
)
)
deployment_id = result.scalar_one_or_none()
deployment_id = result.scalar_one_or_none()

if not deployment_id:
return None
Expand Down
4 changes: 4 additions & 0 deletions src/prefect/server/schemas/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ class DeploymentCreate(ActionBaseModel):
version_info: Optional[schemas.core.VersionInfo] = Field(
default=None, description="A description of this version of the deployment."
)
replaces: Optional[str] = Field(
default=None,
description="The name of an existing deployment that this deployment replaces. Used for renaming deployments.",
)

def check_valid_configuration(self, base_job_template: dict[str, Any]) -> None:
"""
Expand Down
75 changes: 75 additions & 0 deletions tests/server/orchestration/api/test_deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,81 @@ async def test_create_deployment_with_small_parameters_succeeds(
assert response.status_code == 201


class TestCreateDeploymentReplaces:
async def test_replaces_renames_in_place_preserving_id_and_history(
self,
session,
hosted_api_client,
flow,
):
"""replaces renames the existing deployment in place: same ID, run history
intact, new fields applied, old name gone."""
# Create original deployment
original = DeploymentCreate(
name="old-name",
flow_id=flow.id,
tags=["v1"],
description="original",
).model_dump(mode="json")
response = await hosted_api_client.post("/deployments/", json=original)
assert response.status_code == status.HTTP_201_CREATED
original_id = response.json()["id"]

# Create a flow run under the original deployment
run_response = await hosted_api_client.post(
f"/deployments/{original_id}/create_flow_run", json={}
)
assert run_response.status_code == status.HTTP_201_CREATED
run_id = run_response.json()["id"]

# Rename via replaces, also updating fields
replacement = DeploymentCreate(
name="new-name",
flow_id=flow.id,
replaces="old-name",
tags=["v2"],
description="updated",
).model_dump(mode="json")
response = await hosted_api_client.post("/deployments/", json=replacement)
assert response.status_code in (
status.HTTP_200_OK,
status.HTTP_201_CREATED,
)
data = response.json()

# Same ID — renamed in place
assert data["id"] == original_id
assert data["name"] == "new-name"
assert data["tags"] == ["v2"]
assert data["description"] == "updated"

# Old name no longer exists
old_response = await hosted_api_client.get(
f"/deployments/name/{flow.name}/old-name"
)
assert old_response.status_code == status.HTTP_404_NOT_FOUND

# Flow run is still linked to the (renamed) deployment
run_detail = await hosted_api_client.get(f"/flow_runs/{run_id}")
assert run_detail.status_code == status.HTTP_200_OK
assert run_detail.json()["deployment_id"] == original_id

async def test_replaces_nonexistent_falls_back_to_create(
self,
hosted_api_client,
flow,
):
"""replaces pointing to a missing deployment creates a new one and warns."""
data = DeploymentCreate(
name="brand-new",
flow_id=flow.id,
replaces="does-not-exist",
).model_dump(mode="json")
response = await hosted_api_client.post("/deployments/", json=data)
assert response.status_code == status.HTTP_201_CREATED
assert response.json()["name"] == "brand-new"


class TestReadDeployment:
async def test_read_deployment(
self,
Expand Down
Loading