Skip to content

Add delete file index to pyiceberg and support equality delete reads #2255

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 133 additions & 14 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
Any,
Callable,
Dict,
FrozenSet,
Generic,
Iterable,
Iterator,
Expand Down Expand Up @@ -978,18 +979,23 @@ def _get_file_format(file_format: FileFormat, **kwargs: Dict[str, Any]) -> ds.Fi
raise ValueError(f"Unsupported file format: {file_format}")


def _read_deletes(io: FileIO, data_file: DataFile) -> Dict[str, pa.ChunkedArray]:
def _read_deletes(io: FileIO, data_file: DataFile) -> Union[Dict[str, pa.ChunkedArray], pa.Table]:
if data_file.file_format == FileFormat.PARQUET:
with io.new_input(data_file.file_path).open() as fi:
delete_fragment = _get_file_format(
data_file.file_format, dictionary_columns=("file_path",), pre_buffer=True, buffer_size=ONE_MEGABYTE
).make_fragment(fi)
table = ds.Scanner.from_fragment(fragment=delete_fragment).to_table()
table = table.unify_dictionaries()
return {
file.as_py(): table.filter(pc.field("file_path") == file).column("pos")
for file in table.column("file_path").chunks[0].dictionary
}
if data_file.content == DataFileContent.POSITION_DELETES:
table = table.unify_dictionaries()
return {
file.as_py(): table.filter(pc.field("file_path") == file).column("pos")
for file in table.column("file_path").chunks[0].dictionary
}
elif data_file.content == DataFileContent.EQUALITY_DELETES:
return table
else:
raise ValueError(f"Unsupported delete file content: {data_file.content}")
elif data_file.file_format == FileFormat.PUFFIN:
with io.new_input(data_file.file_path).open() as fi:
payload = fi.read()
Expand Down Expand Up @@ -1445,7 +1451,7 @@ def _task_to_record_batches(
bound_row_filter: BooleanExpression,
projected_schema: Schema,
projected_field_ids: Set[int],
positional_deletes: Optional[List[ChunkedArray]],
deletes: Optional[List[Union[pa.ChunkedArray, pa.Table]]],
case_sensitive: bool,
name_mapping: Optional[NameMapping] = None,
partition_spec: Optional[PartitionSpec] = None,
Expand Down Expand Up @@ -1479,9 +1485,20 @@ def _task_to_record_batches(
schema=physical_schema,
# This will push down the query to Arrow.
# But in case there are positional deletes, we have to apply them first
filter=pyarrow_filter if not positional_deletes else None,
filter=pyarrow_filter if not deletes else None,
columns=[col.name for col in file_project_schema.columns],
)
positional_deletes = None
equality_delete_groups = None
if deletes:
positional_deletes = [d for d in deletes if isinstance(d, pa.ChunkedArray)]
equality_deletes = [d for d in deletes if isinstance(d, pa.Table)]

# preprocess equality deletes to be applied
if equality_deletes:
task_eq_files = [df for df in task.delete_files if df.content == DataFileContent.EQUALITY_DELETES]
# concatenate equality delete tables with same set of field ids to reduce anti joins
equality_delete_groups = _group_deletes_by_equality_ids(task_eq_files, equality_deletes)

next_index = 0
batches = fragment_scanner.to_batches()
Expand All @@ -1499,6 +1516,17 @@ def _task_to_record_batches(
if current_batch.num_rows == 0:
continue

if equality_delete_groups:
table = pa.Table.from_batches([current_batch])
for equality_ids, combined_table in equality_delete_groups.items():
table = _apply_equality_deletes(table, combined_table, list(equality_ids), file_schema)
if table.num_rows == 0:
break
if table.num_rows > 0:
current_batch = table.combine_chunks().to_batches()[0]
else:
continue

# Apply the user filter
if pyarrow_filter is not None:
# Temporary fix until PyArrow 21 is released ( https://github.com/apache/arrow/pull/46057 )
Expand Down Expand Up @@ -1528,22 +1556,64 @@ def _task_to_record_batches(
yield result_batch


def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> Dict[str, List[ChunkedArray]]:
deletes_per_file: Dict[str, List[ChunkedArray]] = {}
unique_deletes = set(itertools.chain.from_iterable([task.delete_files for task in tasks]))
if len(unique_deletes) > 0:
def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> Union[Dict[str, pa.ChunkedArray], pa.Table]:
deletes_per_file: Dict[str, List[Union[pa.ChunkedArray, pa.Table]]] = {}

positional_deletes = {
df
for task in tasks
for df in task.delete_files
if df.content == DataFileContent.POSITION_DELETES and df.file_format != FileFormat.PUFFIN
}
if positional_deletes:
executor = ExecutorFactory.get_or_create()
deletes_per_files: Iterator[Dict[str, ChunkedArray]] = executor.map(
lambda args: _read_deletes(*args),
[(io, delete_file) for delete_file in unique_deletes],
[(io, delete_file) for delete_file in positional_deletes],
)
for delete in deletes_per_files:
for file, arr in delete.items():
if file in deletes_per_file:
deletes_per_file[file].append(arr)
else:
deletes_per_file[file] = [arr]
deletion_vectors = {
df
for task in tasks
for df in task.delete_files
if df.content == DataFileContent.POSITION_DELETES and df.file_format == FileFormat.PUFFIN
}
if deletion_vectors:
executor = ExecutorFactory.get_or_create()
dv_results: Iterator[Dict[str, ChunkedArray]] = executor.map(
lambda args: _read_deletes(*args),
[(io, delete_file) for delete_file in deletion_vectors],
)
for delete in dv_results:
for file, arr in delete.items():
# Deletion vectors replace all position deletes for a file
deletes_per_file[file] = [arr]

equality_delete_tasks = []
for task in tasks:
equality_deletes = [df for df in task.delete_files if df.content == DataFileContent.EQUALITY_DELETES]
if equality_deletes:
for delete_file in equality_deletes:
# create a group of datafile to associated equality delete
equality_delete_tasks.append((task.file.file_path, delete_file))

if equality_delete_tasks:
executor = ExecutorFactory.get_or_create()
# Processing equality delete tasks in parallel like position deletes
equality_delete_results = executor.map(
lambda args: (args[0], _read_deletes(io, args[1])),
equality_delete_tasks,
)

for file_path, equality_delete_table in equality_delete_results:
if file_path not in deletes_per_file:
deletes_per_file[file_path] = []
deletes_per_file[file_path].append(equality_delete_table)
return deletes_per_file


Expand Down Expand Up @@ -1679,7 +1749,7 @@ def batches_for_task(task: FileScanTask) -> List[pa.RecordBatch]:
break

def _record_batches_from_scan_tasks_and_deletes(
self, tasks: Iterable[FileScanTask], deletes_per_file: Dict[str, List[ChunkedArray]]
self, tasks: Iterable[FileScanTask], deletes_per_file: Union[Dict[str, pa.ChunkedArray], pa.Table]
) -> Iterator[pa.RecordBatch]:
total_row_count = 0
for task in tasks:
Expand Down Expand Up @@ -2799,3 +2869,52 @@ def _get_field_from_arrow_table(arrow_table: pa.Table, field_path: str) -> pa.Ar
field_array = arrow_table[path_parts[0]]
# Navigate into the struct using the remaining path parts
return pc.struct_field(field_array, path_parts[1:])


def _group_deletes_by_equality_ids(
task_eq_files: List[DataFile], equality_delete_tables: List[pa.Table]
) -> dict[FrozenSet[int], pa.Table]:
"""Concatenate equality delete tables by their equality IDs to reduce number of anti joins."""
from collections import defaultdict

equality_delete_groups: Dict[FrozenSet[int], pa.Table] = {}
group_map = defaultdict(list)

# Group the tables by their equality IDs
for delete_file, delete_table in zip(task_eq_files, equality_delete_tables):
if delete_file.equality_ids is not None:
key = frozenset(delete_file.equality_ids)
group_map[key].append(delete_table)

# Concat arrow tables in the same groups
for equality_ids, delete_tables in group_map.items():
if delete_tables:
equality_delete_groups[equality_ids] = pa.concat_tables(delete_tables) if len(delete_tables) > 1 else delete_tables[0]
return equality_delete_groups


def _apply_equality_deletes(
data_table: pa.Table, delete_table: pa.Table, equality_ids: List[int], data_schema: Optional[Schema]
) -> pa.Table:
"""Apply equality deletes to a data table.

Filter out rows from the table that match the equality delete table the conditions in it.
Args:
data_table: A PyArrow table which has data to filter
delete_table: A PyArrow table containing the equality deletes
equality_ids: A List of field IDs to use for equality comparison
data_schema: The schema of the PyArrow table
Returns:
A filtered PyArrow table with matching rows removed
"""
if len(delete_table) == 0:
return data_table
if data_schema is None:
raise ValueError("Schema is required for applying equality deletes")

# Resolve the correct columns to be used in the anti join
equality_columns = [data_schema.find_field(fid).name for fid in equality_ids]

# Use PyArrow's join function with left anti join type
result = data_table.join(delete_table.select(equality_columns), keys=equality_columns, join_type="left anti")
return result
41 changes: 14 additions & 27 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
)

from pydantic import Field
from sortedcontainers import SortedList

import pyiceberg.expressions.parser as parser
from pyiceberg.expressions import (
Expand All @@ -64,7 +63,6 @@
)
from pyiceberg.io import FileIO, load_file_io
from pyiceberg.manifest import (
POSITIONAL_DELETE_SCHEMA,
DataFile,
DataFileContent,
ManifestContent,
Expand All @@ -78,6 +76,7 @@
PartitionSpec,
)
from pyiceberg.schema import Schema
from pyiceberg.table.delete_file_index import DeleteFileIndex
from pyiceberg.table.inspect import InspectTable
from pyiceberg.table.locations import LocationProvider, load_location_provider
from pyiceberg.table.metadata import (
Expand Down Expand Up @@ -1793,29 +1792,20 @@ def _min_sequence_number(manifests: List[ManifestFile]) -> int:
return INITIAL_SEQUENCE_NUMBER


def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_entries: SortedList[ManifestEntry]) -> Set[DataFile]:
"""Check if the delete file is relevant for the data file.

Using the column metrics to see if the filename is in the lower and upper bound.
def _match_deletes_to_data_file(data_entry: ManifestEntry, delete_file_index: DeleteFileIndex) -> Set[DataFile]:
"""Check if delete files are relevant for the data file.

Args:
data_entry (ManifestEntry): The manifest entry path of the datafile.
positional_delete_entries (List[ManifestEntry]): All the candidate positional deletes manifest entries.
data_entry (ManifestEntry): The manifest entry of the data file.
delete_file_index (DeleteFileIndex): Index containing all delete files.

Returns:
A set of files that are relevant for the data file.
A set of delete files that are relevant for the data file.
"""
relevant_entries = positional_delete_entries[positional_delete_entries.bisect_right(data_entry) :]

if len(relevant_entries) > 0:
evaluator = _InclusiveMetricsEvaluator(POSITIONAL_DELETE_SCHEMA, EqualTo("file_path", data_entry.data_file.file_path))
return {
positional_delete_entry.data_file
for positional_delete_entry in relevant_entries
if evaluator.eval(positional_delete_entry.data_file)
}
else:
return set()
candidate_deletes = delete_file_index.for_data_file(
data_entry.sequence_number or 0, data_entry.data_file, partition_key=data_entry.data_file.partition
)
return set(candidate_deletes)


class DataScan(TableScan):
Expand Down Expand Up @@ -1921,7 +1911,7 @@ def plan_files(self) -> Iterable[FileScanTask]:
min_sequence_number = _min_sequence_number(manifests)

data_entries: List[ManifestEntry] = []
positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER)
delete_file_index = DeleteFileIndex(self.table_metadata.schema(), self.table_metadata.specs())

executor = ExecutorFactory.get_or_create()
for manifest_entry in chain(
Expand All @@ -1942,19 +1932,16 @@ def plan_files(self) -> Iterable[FileScanTask]:
data_file = manifest_entry.data_file
if data_file.content == DataFileContent.DATA:
data_entries.append(manifest_entry)
elif data_file.content == DataFileContent.POSITION_DELETES:
positional_delete_entries.add(manifest_entry)
elif data_file.content == DataFileContent.EQUALITY_DELETES:
raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568")
elif data_file.content in (DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES):
delete_file_index.add_delete_file(manifest_entry, partition_key=data_file.partition)
else:
raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}")

return [
FileScanTask(
data_entry.data_file,
delete_files=_match_deletes_to_data_file(
data_entry,
positional_delete_entries,
delete_file_index,
),
residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for(
data_entry.data_file.partition
Expand Down
Loading