Skip to content

Commit 2d7d089

Browse files
authored
Add support for Bodo DataFrame (#2167)
<!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> <!-- Closes #${GITHUB_ISSUE_ID} --> # Rationale for this change Adds support for Bodo DataFrame library, which is a drop in replacement for Pandas that accelerates and scales Python code automatically by applying query, compiler and HPC optimizations. # Are these changes tested? Added integration test. # Are there any user-facing changes? Adds `Table.to_bodo()` function. Example code: ```python df = table.to_bodo() # equivalent to `bodo.pandas.read_iceberg_table(table)` df = df[df["trip_distance"] >= 10.0] df = df[["VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"]] print(df) ``` <!-- In the case of user-facing changes, please add the changelog label. -->
1 parent 86bf71c commit 2d7d089

File tree

8 files changed

+871
-570
lines changed

8 files changed

+871
-570
lines changed

mkdocs/docs/api.md

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1523,6 +1523,52 @@ print(ray_dataset.take(2))
15231523
]
15241524
```
15251525

1526+
### Bodo
1527+
1528+
PyIceberg interfaces closely with Bodo Dataframes (see [Bodo Iceberg Quick Start](https://docs.bodo.ai/latest/quick_start/quickstart_local_iceberg/)),
1529+
which provides a drop-in replacement for Pandas that applies query, compiler and HPC optimizations automatically.
1530+
Bodo accelerates and scales Python code from single laptops to large clusters without code rewrites.
1531+
1532+
<!-- prettier-ignore-start -->
1533+
1534+
!!! note "Requirements"
1535+
This requires [`bodo` to be installed](index.md).
1536+
1537+
```python
1538+
pip install pyiceberg['bodo']
1539+
```
1540+
<!-- prettier-ignore-end -->
1541+
1542+
A table can be read easily into a Bodo Dataframe to perform Pandas operations:
1543+
1544+
```python
1545+
df = table.to_bodo() # equivalent to `bodo.pandas.read_iceberg_table(table)`
1546+
df = df[df["trip_distance"] >= 10.0]
1547+
df = df[["VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"]]
1548+
print(df)
1549+
```
1550+
1551+
This creates a lazy query, optimizes it, and runs it on all available cores (print triggers execution):
1552+
1553+
```python
1554+
VendorID tpep_pickup_datetime tpep_dropoff_datetime
1555+
0 2 2023-01-01 00:27:12 2023-01-01 00:49:56
1556+
1 2 2023-01-01 00:09:29 2023-01-01 00:29:23
1557+
2 1 2023-01-01 00:13:30 2023-01-01 00:44:00
1558+
3 2 2023-01-01 00:41:41 2023-01-01 01:19:32
1559+
4 2 2023-01-01 00:22:39 2023-01-01 01:30:45
1560+
... ... ... ...
1561+
245478 2 2023-01-31 22:32:57 2023-01-31 23:01:48
1562+
245479 2 2023-01-31 22:03:26 2023-01-31 22:46:13
1563+
245480 2 2023-01-31 23:25:56 2023-02-01 00:05:42
1564+
245481 2 2023-01-31 23:18:00 2023-01-31 23:46:00
1565+
245482 2 2023-01-31 23:18:00 2023-01-31 23:41:00
1566+
1567+
[245483 rows x 3 columns]
1568+
```
1569+
1570+
Bodo is optimized to take advantage of Iceberg features such as hidden partitioning and various statistics for efficient reads.
1571+
15261572
### Daft
15271573

15281574
PyIceberg interfaces closely with Daft Dataframes (see also: [Daft integration with Iceberg](https://docs.daft.ai/en/stable/io/iceberg/)) which provides a full lazily optimized query engine interface on top of PyIceberg tables.

mkdocs/docs/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ You can mix and match optional dependencies depending on your needs:
5252
| pandas | Installs both PyArrow and Pandas |
5353
| duckdb | Installs both PyArrow and DuckDB |
5454
| ray | Installs PyArrow, Pandas, and Ray |
55+
| bodo | Installs Bodo |
5556
| daft | Installs Daft |
5657
| polars | Installs Polars |
5758
| s3fs | S3FS as a FileIO implementation to interact with the object store |

poetry.lock

Lines changed: 779 additions & 561 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyiceberg/table/__init__.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@
137137
from pyiceberg.utils.properties import property_as_bool
138138

139139
if TYPE_CHECKING:
140+
import bodo.pandas as bd
140141
import daft
141142
import pandas as pd
142143
import polars as pl
@@ -1485,6 +1486,16 @@ def to_daft(self) -> daft.DataFrame:
14851486

14861487
return daft.read_iceberg(self)
14871488

1489+
def to_bodo(self) -> bd.DataFrame:
1490+
"""Read a bodo DataFrame lazily from this Iceberg table.
1491+
1492+
Returns:
1493+
bd.DataFrame: Unmaterialized Bodo Dataframe created from the Iceberg table
1494+
"""
1495+
import bodo.pandas as bd
1496+
1497+
return bd.read_iceberg_table(self)
1498+
14881499
def to_polars(self) -> pl.LazyFrame:
14891500
"""Lazily read from this Apache Iceberg table.
14901501

pyproject.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ gcsfs = { version = ">=2023.1.0", optional = true }
7878
huggingface-hub = { version = ">=0.24.0", optional = true }
7979
psycopg2-binary = { version = ">=2.9.6", optional = true }
8080
sqlalchemy = { version = "^2.0.18", optional = true }
81+
bodo = { version = ">=2025.7.4", optional = true }
8182
daft = { version = ">=0.5.0", optional = true }
8283
cachetools = ">=5.5,<7.0"
8384
pyiceberg-core = { version = "^0.5.1", optional = true }
@@ -298,6 +299,7 @@ pyarrow = ["pyarrow", "pyiceberg-core"]
298299
pandas = ["pandas", "pyarrow"]
299300
duckdb = ["duckdb", "pyarrow"]
300301
ray = ["ray", "pyarrow", "pandas"]
302+
bodo = ["bodo"]
301303
daft = ["daft"]
302304
polars = ["polars"]
303305
snappy = ["python-snappy"]
@@ -485,6 +487,10 @@ ignore_missing_imports = true
485487
module = "daft.*"
486488
ignore_missing_imports = true
487489

490+
[[tool.mypy.overrides]]
491+
module = "bodo.*"
492+
ignore_missing_imports = true
493+
488494
[[tool.mypy.overrides]]
489495
module = "pyparsing.*"
490496
ignore_missing_imports = true

tests/integration/test_reads.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,16 @@ def test_daft_nan_rewritten(catalog: Catalog) -> None:
339339
assert math.isnan(df.to_pydict()["col_numeric"][0])
340340

341341

342+
@pytest.mark.integration
343+
@pytest.mark.filterwarnings("ignore")
344+
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
345+
def test_bodo_nan(catalog: Catalog) -> None:
346+
table_test_null_nan_rewritten = catalog.load_table("default.test_null_nan_rewritten")
347+
df = table_test_null_nan_rewritten.to_bodo()
348+
assert len(df) == 3
349+
assert math.isnan(df.col_numeric.iloc[0])
350+
351+
342352
@pytest.mark.integration
343353
@pytest.mark.filterwarnings("ignore")
344354
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])

tests/integration/test_writes/test_partitioned_writes.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -546,27 +546,31 @@ def test_summaries_with_null(spark: SparkSession, session_catalog: Catalog, arro
546546
"total-data-files": "6",
547547
"total-records": "6",
548548
}
549+
assert "removed-files-size" in summaries[5]
550+
assert "total-files-size" in summaries[5]
549551
assert summaries[5] == {
550-
"removed-files-size": "16174",
552+
"removed-files-size": summaries[5]["removed-files-size"],
551553
"changed-partition-count": "2",
552554
"total-equality-deletes": "0",
553555
"deleted-data-files": "4",
554556
"total-position-deletes": "0",
555557
"total-delete-files": "0",
556558
"deleted-records": "4",
557-
"total-files-size": "8884",
559+
"total-files-size": summaries[5]["total-files-size"],
558560
"total-data-files": "2",
559561
"total-records": "2",
560562
}
563+
assert "added-files-size" in summaries[6]
564+
assert "total-files-size" in summaries[6]
561565
assert summaries[6] == {
562566
"changed-partition-count": "2",
563567
"added-data-files": "2",
564568
"total-equality-deletes": "0",
565569
"added-records": "2",
566570
"total-position-deletes": "0",
567-
"added-files-size": "8087",
571+
"added-files-size": summaries[6]["added-files-size"],
568572
"total-delete-files": "0",
569-
"total-files-size": "16971",
573+
"total-files-size": summaries[6]["total-files-size"],
570574
"total-data-files": "4",
571575
"total-records": "4",
572576
}

tests/integration/test_writes/test_writes.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -309,15 +309,17 @@ def test_summaries_partial_overwrite(spark: SparkSession, session_catalog: Catal
309309
assert file_size > 0
310310

311311
# APPEND
312+
assert "added-files-size" in summaries[0]
313+
assert "total-files-size" in summaries[0]
312314
assert summaries[0] == {
313315
"added-data-files": "3",
314-
"added-files-size": "2618",
316+
"added-files-size": summaries[0]["added-files-size"],
315317
"added-records": "5",
316318
"changed-partition-count": "3",
317319
"total-data-files": "3",
318320
"total-delete-files": "0",
319321
"total-equality-deletes": "0",
320-
"total-files-size": "2618",
322+
"total-files-size": summaries[0]["total-files-size"],
321323
"total-position-deletes": "0",
322324
"total-records": "5",
323325
}
@@ -344,18 +346,21 @@ def test_summaries_partial_overwrite(spark: SparkSession, session_catalog: Catal
344346
# }
345347
files = tbl.inspect.data_files()
346348
assert len(files) == 3
349+
assert "added-files-size" in summaries[1]
350+
assert "removed-files-size" in summaries[1]
351+
assert "total-files-size" in summaries[1]
347352
assert summaries[1] == {
348353
"added-data-files": "1",
349-
"added-files-size": "875",
354+
"added-files-size": summaries[1]["added-files-size"],
350355
"added-records": "2",
351356
"changed-partition-count": "1",
352357
"deleted-data-files": "1",
353358
"deleted-records": "3",
354-
"removed-files-size": "882",
359+
"removed-files-size": summaries[1]["removed-files-size"],
355360
"total-data-files": "3",
356361
"total-delete-files": "0",
357362
"total-equality-deletes": "0",
358-
"total-files-size": "2611",
363+
"total-files-size": summaries[1]["total-files-size"],
359364
"total-position-deletes": "0",
360365
"total-records": "4",
361366
}

0 commit comments

Comments
 (0)