-
Notifications
You must be signed in to change notification settings - Fork 342
[Append Scan] Introduce an AbstractTableScan
with default methods
#2230
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1630,16 +1630,17 @@ def _parse_row_filter(expr: Union[str, BooleanExpression]) -> BooleanExpression: | |
return parser.parse(expr) if isinstance(expr, str) else expr | ||
|
||
|
||
S = TypeVar("S", bound="TableScan", covariant=True) | ||
A = TypeVar("A", bound="AbstractTableScan", covariant=True) | ||
|
||
|
||
class AbstractTableScan(ABC): | ||
"""A base class for all table scans.""" | ||
|
||
class TableScan(ABC): | ||
table_metadata: TableMetadata | ||
io: FileIO | ||
row_filter: BooleanExpression | ||
selected_fields: Tuple[str, ...] | ||
case_sensitive: bool | ||
snapshot_id: Optional[int] | ||
options: Properties | ||
limit: Optional[int] | ||
|
||
|
@@ -1650,7 +1651,6 @@ def __init__( | |
row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, | ||
selected_fields: Tuple[str, ...] = ("*",), | ||
case_sensitive: bool = True, | ||
snapshot_id: Optional[int] = None, | ||
options: Properties = EMPTY_DICT, | ||
limit: Optional[int] = None, | ||
): | ||
|
@@ -1659,10 +1659,108 @@ def __init__( | |
self.row_filter = _parse_row_filter(row_filter) | ||
self.selected_fields = selected_fields | ||
self.case_sensitive = case_sensitive | ||
self.snapshot_id = snapshot_id | ||
self.options = options | ||
self.limit = limit | ||
|
||
@abstractmethod | ||
def projection(self) -> Schema: ... | ||
|
||
@abstractmethod | ||
def plan_files(self) -> Iterable[ScanTask]: ... | ||
|
||
@abstractmethod | ||
def to_arrow(self) -> pa.Table: ... | ||
|
||
def select(self: A, *field_names: str) -> A: | ||
if "*" in self.selected_fields: | ||
return self.update(selected_fields=field_names) | ||
return self.update(selected_fields=tuple(set(self.selected_fields).intersection(set(field_names)))) | ||
|
||
def filter(self: A, expr: Union[str, BooleanExpression]) -> A: | ||
return self.update(row_filter=And(self.row_filter, _parse_row_filter(expr))) | ||
|
||
def with_case_sensitive(self: A, case_sensitive: bool = True) -> A: | ||
return self.update(case_sensitive=case_sensitive) | ||
|
||
def update(self: A, **overrides: Any) -> A: | ||
"""Create a copy of this table scan with updated fields.""" | ||
from inspect import signature | ||
|
||
# Extract those attributes that are constructor parameters. We don't use self.__dict__ as the kwargs to the | ||
# constructors because it may contain additional attributes that are not part of the constructor signature. | ||
params = signature(type(self).__init__).parameters.keys() - {"self"} # Skip "self" parameter | ||
kwargs = {param: getattr(self, param) for param in params} # Assume parameters are attributes | ||
|
||
return type(self)(**{**kwargs, **overrides}) | ||
|
||
def to_pandas(self, **kwargs: Any) -> pd.DataFrame: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Highlighting this change. I've added default implementations based on This technically changes the The motivation here was to reduce duplication between |
||
"""Read a Pandas DataFrame eagerly from this Iceberg table scan. | ||
|
||
Returns: | ||
pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table scan | ||
""" | ||
return self.to_arrow().to_pandas(**kwargs) | ||
|
||
def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Highlighting this also - (Again, the motivation here was to reduce duplication) |
||
"""Shorthand for loading this table scan in DuckDB. | ||
|
||
Returns: | ||
DuckDBPyConnection: In memory DuckDB connection with the Iceberg table scan. | ||
""" | ||
import duckdb | ||
|
||
con = connection or duckdb.connect(database=":memory:") | ||
con.register(table_name, self.to_arrow()) | ||
|
||
return con | ||
|
||
def to_ray(self) -> ray.data.dataset.Dataset: | ||
"""Read a Ray Dataset eagerly from this Iceberg table scan. | ||
|
||
Returns: | ||
ray.data.dataset.Dataset: Materialized Ray Dataset from the Iceberg table scan | ||
""" | ||
import ray | ||
|
||
return ray.data.from_arrow(self.to_arrow()) | ||
|
||
def to_polars(self) -> pl.DataFrame: | ||
"""Read a Polars DataFrame from this Iceberg table scan. | ||
|
||
Returns: | ||
pl.DataFrame: Materialized Polars Dataframe from the Iceberg table scan | ||
""" | ||
import polars as pl | ||
|
||
result = pl.from_arrow(self.to_arrow()) | ||
if isinstance(result, pl.Series): | ||
result = result.to_frame() | ||
|
||
return result | ||
|
||
|
||
S = TypeVar("S", bound="TableScan", covariant=True) | ||
|
||
|
||
class TableScan(AbstractTableScan, ABC): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Methods on this class like |
||
"""A base class for table scans targeting a single snapshot.""" | ||
|
||
snapshot_id: Optional[int] | ||
|
||
def __init__( | ||
self, | ||
table_metadata: TableMetadata, | ||
io: FileIO, | ||
row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, | ||
selected_fields: Tuple[str, ...] = ("*",), | ||
case_sensitive: bool = True, | ||
snapshot_id: Optional[int] = None, | ||
options: Properties = EMPTY_DICT, | ||
limit: Optional[int] = None, | ||
): | ||
super().__init__(table_metadata, io, row_filter, selected_fields, case_sensitive, options, limit) | ||
self.snapshot_id = snapshot_id | ||
|
||
def snapshot(self) -> Optional[Snapshot]: | ||
if self.snapshot_id: | ||
return self.table_metadata.snapshot_by_id(self.snapshot_id) | ||
|
@@ -1688,29 +1786,6 @@ def projection(self) -> Schema: | |
|
||
return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) | ||
|
||
@abstractmethod | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pointing out that these removed methods have just been moved to |
||
def plan_files(self) -> Iterable[ScanTask]: ... | ||
|
||
@abstractmethod | ||
def to_arrow(self) -> pa.Table: ... | ||
|
||
@abstractmethod | ||
def to_pandas(self, **kwargs: Any) -> pd.DataFrame: ... | ||
|
||
@abstractmethod | ||
def to_polars(self) -> pl.DataFrame: ... | ||
|
||
def update(self: S, **overrides: Any) -> S: | ||
"""Create a copy of this table scan with updated fields.""" | ||
from inspect import signature | ||
|
||
# Extract those attributes that are constructor parameters. We don't use self.__dict__ as the kwargs to the | ||
# constructors because it may contain additional attributes that are not part of the constructor signature. | ||
params = signature(type(self).__init__).parameters.keys() - {"self"} # Skip "self" parameter | ||
kwargs = {param: getattr(self, param) for param in params} # Assume parameters are attributes | ||
|
||
return type(self)(**{**kwargs, **overrides}) | ||
|
||
def use_ref(self: S, name: str) -> S: | ||
if self.snapshot_id: | ||
raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}") | ||
|
@@ -1719,17 +1794,6 @@ def use_ref(self: S, name: str) -> S: | |
|
||
raise ValueError(f"Cannot scan unknown ref={name}") | ||
|
||
def select(self: S, *field_names: str) -> S: | ||
if "*" in self.selected_fields: | ||
return self.update(selected_fields=field_names) | ||
return self.update(selected_fields=tuple(set(self.selected_fields).intersection(set(field_names)))) | ||
|
||
def filter(self: S, expr: Union[str, BooleanExpression]) -> S: | ||
return self.update(row_filter=And(self.row_filter, _parse_row_filter(expr))) | ||
|
||
def with_case_sensitive(self: S, case_sensitive: bool = True) -> S: | ||
return self.update(case_sensitive=case_sensitive) | ||
|
||
@abstractmethod | ||
def count(self) -> int: ... | ||
|
||
|
@@ -2002,51 +2066,6 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: | |
batches, | ||
).cast(target_schema) | ||
|
||
def to_pandas(self, **kwargs: Any) -> pd.DataFrame: | ||
"""Read a Pandas DataFrame eagerly from this Iceberg table. | ||
|
||
Returns: | ||
pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table | ||
""" | ||
return self.to_arrow().to_pandas(**kwargs) | ||
|
||
def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection: | ||
"""Shorthand for loading the Iceberg Table in DuckDB. | ||
|
||
Returns: | ||
DuckDBPyConnection: In memory DuckDB connection with the Iceberg table. | ||
""" | ||
import duckdb | ||
|
||
con = connection or duckdb.connect(database=":memory:") | ||
con.register(table_name, self.to_arrow()) | ||
|
||
return con | ||
|
||
def to_ray(self) -> ray.data.dataset.Dataset: | ||
"""Read a Ray Dataset eagerly from this Iceberg table. | ||
|
||
Returns: | ||
ray.data.dataset.Dataset: Materialized Ray Dataset from the Iceberg table | ||
""" | ||
import ray | ||
|
||
return ray.data.from_arrow(self.to_arrow()) | ||
|
||
def to_polars(self) -> pl.DataFrame: | ||
"""Read a Polars DataFrame from this Iceberg table. | ||
|
||
Returns: | ||
pl.DataFrame: Materialized Polars Dataframe from the Iceberg table | ||
""" | ||
import polars as pl | ||
|
||
result = pl.from_arrow(self.to_arrow()) | ||
if isinstance(result, pl.Series): | ||
result = result.to_frame() | ||
|
||
return result | ||
|
||
def count(self) -> int: | ||
from pyiceberg.io.pyarrow import ArrowScan | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GitHub's diff here got messed up -
TableScan
andS
remains, just below.