diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 4905c31bfb..4c40dc150d 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -198,6 +198,11 @@ class RemoveStatisticsUpdate(IcebergBaseModel): snapshot_id: int = Field(alias="snapshot-id") +class RemovePartitionSpecsUpdate(IcebergBaseModel): + action: Literal["remove-partition-spec"] = Field(default="remove-partition-spec") + spec_ids: List[int] = Field(alias="spec-ids") + + TableUpdate = Annotated[ Union[ AssignUUIDUpdate, @@ -217,6 +222,7 @@ class RemoveStatisticsUpdate(IcebergBaseModel): RemovePropertiesUpdate, SetStatisticsUpdate, RemoveStatisticsUpdate, + RemovePartitionSpecsUpdate, ], Field(discriminator="action"), ] @@ -582,6 +588,21 @@ def _(update: RemoveStatisticsUpdate, base_metadata: TableMetadata, context: _Ta return base_metadata.model_copy(update={"statistics": statistics}) +@_apply_table_update.register(RemovePartitionSpecsUpdate) +def _(update: RemovePartitionSpecsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata: + for remove_spec_id in update.spec_ids: + if not any(spec.spec_id == remove_spec_id for spec in base_metadata.partition_specs): + raise ValueError(f"Partition spec with id {remove_spec_id} does not exist") + + if base_metadata.default_spec_id in update.spec_ids: + raise ValueError(f"Cannot remove default partition spec: {base_metadata.default_spec_id}") + + partition_specs = [spec for spec in base_metadata.partition_specs if spec.spec_id not in update.spec_ids] + + context.add_update(update) + return base_metadata.model_copy(update={"partition_specs": partition_specs}) + + def update_table_metadata( base_metadata: TableMetadata, updates: Tuple[TableUpdate, ...], diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 6165dadec4..bc25089e8e 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -66,6 +66,7 @@ ) from pyiceberg.table.statistics import BlobMetadata, StatisticsFile from pyiceberg.table.update import ( + AddPartitionSpecUpdate, AddSnapshotUpdate, AddSortOrderUpdate, AssertCreate, @@ -76,6 +77,7 @@ AssertLastAssignedPartitionId, AssertRefSnapshotId, AssertTableUUID, + RemovePartitionSpecsUpdate, RemovePropertiesUpdate, RemoveSnapshotRefUpdate, RemoveSnapshotsUpdate, @@ -1267,6 +1269,38 @@ def test_update_metadata_log_overflow(table_v2: Table) -> None: assert len(new_metadata.metadata_log) == 1 +def test_remove_partition_spec_update(table_v2: Table) -> None: + base_metadata = table_v2.metadata + new_spec = PartitionSpec(PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="y"), spec_id=1) + metadata_with_new_spec = update_table_metadata(base_metadata, (AddPartitionSpecUpdate(spec=new_spec),)) + + assert len(metadata_with_new_spec.partition_specs) == 2 + + update = RemovePartitionSpecsUpdate(spec_ids=[1]) + updated_metadata = update_table_metadata( + metadata_with_new_spec, + (update,), + ) + + assert len(updated_metadata.partition_specs) == 1 + + +def test_remove_partition_spec_update_spec_does_not_exist(table_v2: Table) -> None: + update = RemovePartitionSpecsUpdate( + spec_ids=[123], + ) + with pytest.raises(ValueError, match="Partition spec with id 123 does not exist"): + update_table_metadata(table_v2.metadata, (update,)) + + +def test_remove_partition_spec_update_default_spec(table_v2: Table) -> None: + update = RemovePartitionSpecsUpdate( + spec_ids=[0], + ) + with pytest.raises(ValueError, match="Cannot remove default partition spec: 0"): + update_table_metadata(table_v2.metadata, (update,)) + + def test_set_statistics_update(table_v2_with_statistics: Table) -> None: snapshot_id = table_v2_with_statistics.metadata.current_snapshot_id