diff --git a/dev/provision.py b/dev/provision.py index 44086caf20..6572cadb29 100644 --- a/dev/provision.py +++ b/dev/provision.py @@ -342,3 +342,52 @@ (array(), map(), array(struct(1))) """ ) + + spark.sql( + f""" + CREATE OR REPLACE TABLE {catalog_name}.default.test_table_read_from_snapshots ( + dt date, + number integer, + letter string + ) + USING iceberg + TBLPROPERTIES ( + 'format-version'='2' + ); + """ + ) + + spark.sql( + f""" + INSERT INTO {catalog_name}.default.test_table_read_from_snapshots + VALUES (CAST('2022-03-01' AS date), 1, 'a') + """ + ) + + spark.sql( + f""" + INSERT INTO {catalog_name}.default.test_table_read_from_snapshots + VALUES (CAST('2022-03-01' AS date), 2, 'b') + """ + ) + + spark.sql( + f""" + DELETE FROM {catalog_name}.default.test_table_read_from_snapshots + WHERE number = 2 + """ + ) + + spark.sql( + f""" + INSERT INTO {catalog_name}.default.test_table_read_from_snapshots + VALUES (CAST('2022-03-02' AS date), 3, 'c') + """ + ) + + spark.sql( + f""" + INSERT INTO {catalog_name}.default.test_table_read_from_snapshots + VALUES (CAST('2022-03-02' AS date), 4, 'd') + """ + ) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index c57f0d1297..af9d767a22 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import collections import itertools import uuid import warnings @@ -1235,6 +1236,24 @@ def scan( limit=limit, ) + def incremental_changelog_scan( + self, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + selected_fields: Tuple[str, ...] = ("*",), + case_sensitive: bool = True, + options: Properties = EMPTY_DICT, + limit: Optional[int] = None, + ) -> IncrementalChangelogScan: + return IncrementalChangelogScan( + table_metadata=self.metadata, + io=self.io, + row_filter=row_filter, + selected_fields=selected_fields, + case_sensitive=case_sensitive, + options=options, + limit=limit, + ) + @property def format_version(self) -> TableVersion: return self.metadata.format_version @@ -1587,15 +1606,43 @@ def __init__( self.length = length or data_file.file_size_in_bytes +class ChangelogOperation(Enum): + Insert = 1 + Delete = 2 + UpdateBefore = 3 + UpdateAfter = 4 + + +@dataclass(init=False) +class ChangelogScanTask(ScanTask): + file: DataFile + change_type: ChangelogOperation + change_ordinal: int + change_snapshot_id: int + + def __init__( + self, + data_file: DataFile, + change_type: ChangelogOperation, + change_ordinal: int, + change_snapshot_id: int, + ) -> None: + self.file = data_file + self.change_type = change_type + self.change_ordinal = change_ordinal + self.change_snapshot_id = change_snapshot_id + + def _open_manifest( io: FileIO, manifest: ManifestFile, partition_filter: Callable[[DataFile], bool], metrics_evaluator: Callable[[DataFile], bool], + discard_deleted: bool = True, ) -> List[ManifestEntry]: return [ manifest_entry - for manifest_entry in manifest.fetch_manifest_entry(io, discard_deleted=True) + for manifest_entry in manifest.fetch_manifest_entry(io, discard_deleted=discard_deleted) if partition_filter(manifest_entry.data_file) and metrics_evaluator(manifest_entry.data_file) ] @@ -1778,6 +1825,107 @@ def to_ray(self) -> ray.data.dataset.Dataset: return ray.data.from_arrow(self.to_arrow()) +class IncrementalChangelogScan(BaseIncrementalScan): + def __init__( + self, + table_metadata: TableMetadata, + io: FileIO, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + selected_fields: Tuple[str, ...] = ("*",), + case_sensitive: bool = True, + options: Properties = EMPTY_DICT, + limit: Optional[int] = None, + to_snapshot_id: Optional[int] = None, + from_snapshot_id_exclusive: Optional[int] = None, + ): + super().__init__( + table_metadata, + io, + row_filter, + selected_fields, + case_sensitive, + options, + limit, + to_snapshot_id, + from_snapshot_id_exclusive, + ) + + def use_ref(self: S, name: str) -> S: + raise NotImplementedError("Not implemented for IncrementalChangelogScan yet.") + + def _do_plan_files(self, from_snapshot_id_exclusive: int, to_snapshot_id: int) -> Iterable[ChangelogScanTask]: + snapshots_between = collections.deque() + for snapshot in ancestors_between(to_snapshot_id, from_snapshot_id_exclusive, self.table_metadata): + if snapshot.summary.operation != Operation.REPLACE: # type: ignore + for manifest_file in self.table_metadata.snapshot_by_id(snapshot.snapshot_id).manifests(self.io): + if manifest_file.content == ManifestContent.DELETES: + raise "Delete files are currently not supported in changelog scans" + snapshots_between.appendleft(snapshot) + + if not snapshots_between: + return iter([]) + + snapshot_ids = {snapshot.snapshot_id for snapshot in snapshots_between} + snapshot_ordinal = {snapshot.snapshot_id: ordinal for ordinal, snapshot in enumerate(snapshots_between)} + + # step 1: filter manifests using partition summaries + # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id + + manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) + + manifests = { + manifest_file + for snapshot_id in snapshot_ids + for manifest_file in self.table_metadata.snapshot_by_id(snapshot_id).manifests(self.io) # type: ignore + if manifest_evaluators[manifest_file.partition_spec_id](manifest_file) + if manifest_file.added_snapshot_id in snapshot_ids + } + + # step 2: filter the data files in each manifest + # this filter depends on the partition spec used to write the manifest file + + partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator) + metrics_evaluator = _InclusiveMetricsEvaluator( + self.table_metadata.schema(), + self.row_filter, + self.case_sensitive, + self.options.get("include_empty_files") == "true", + ).eval + + min_data_sequence_number = _min_data_file_sequence_number(manifests) # type: ignore + + changelog_entries: List[ManifestEntry] = [] + + executor = ExecutorFactory.get_or_create() + for manifest_entry in chain( + *executor.map( + lambda args: _open_manifest(*args), + [ + (self.io, manifest, partition_evaluators[manifest.partition_spec_id], metrics_evaluator, False) + for manifest in manifests + if self._check_sequence_number(min_data_sequence_number, manifest) + ], + ) + ): + if manifest_entry.snapshot_id in snapshot_ids: + changelog_entries.append(manifest_entry) + + for entry in changelog_entries: + if entry.status == ManifestEntryStatus.ADDED: + operation = ChangelogOperation.Insert + elif entry.status == ManifestEntryStatus.DELETED: + operation = ChangelogOperation.Delete + else: + raise f"Unexpected entry status: {entry.status}" + + yield ChangelogScanTask( + data_file=entry.data_file, + change_snapshot_id=entry.snapshot_id, + change_type=operation, + change_ordinal=snapshot_ordinal[entry.snapshot_id] + ) + + class MoveOperation(Enum): First = 1 Before = 2 diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 2a10e37ba9..b97fea6959 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -423,6 +423,25 @@ def test_scan_branch(catalog: Catalog) -> None: assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12] +@pytest.mark.integration +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('session_catalog_hive'), pytest.lazy_fixture('session_catalog')]) +def test_incremental_changelog_scan(catalog: Catalog) -> None: + test_table = catalog.load_table("default.test_table_read_from_snapshots") + + scan = test_table.incremental_changelog_scan().from_snapshot_exclusive(test_table.history()[0].snapshot_id) + assert len(list(scan.plan_files())) == 4 + + scan = test_table.incremental_changelog_scan().to_snapshot(test_table.history()[4].snapshot_id) + assert len(list(scan.plan_files())) == 5 + + scan = ( + test_table.incremental_changelog_scan() + .from_snapshot_exclusive(test_table.history()[0].snapshot_id) + .to_snapshot(test_table.history()[2].snapshot_id) + ) + assert len(list(scan.plan_files())) == 2 + + @pytest.mark.integration @pytest.mark.parametrize('catalog', [pytest.lazy_fixture('session_catalog_hive'), pytest.lazy_fixture('session_catalog')]) def test_filter_on_new_column(catalog: Catalog) -> None: