Skip to content

Commit a0e1a11

Browse files
committed
add uuid partitions test with trino
Add Trino as alternative tool for test uuid partitions as Java Iceberg 1.9.2 in spark is not yet supported. <!--related to #2002-->
1 parent 5d43add commit a0e1a11

File tree

1 file changed

+63
-12
lines changed

1 file changed

+63
-12
lines changed

tests/integration/test_writes/test_writes.py

Lines changed: 63 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
from pydantic_core import ValidationError
3939
from pyspark.sql import SparkSession
4040
from pytest_mock.plugin import MockerFixture
41+
from sqlalchemy import Connection
42+
from sqlalchemy.sql.expression import text
4143

4244
from pyiceberg.catalog import Catalog, load_catalog
4345
from pyiceberg.catalog.hive import HiveCatalog
@@ -50,18 +52,8 @@
5052
from pyiceberg.table import TableProperties
5153
from pyiceberg.table.refs import MAIN_BRANCH
5254
from pyiceberg.table.sorting import SortDirection, SortField, SortOrder
53-
from pyiceberg.transforms import DayTransform, HourTransform, IdentityTransform, Transform
54-
from pyiceberg.types import (
55-
DateType,
56-
DecimalType,
57-
DoubleType,
58-
IntegerType,
59-
ListType,
60-
LongType,
61-
NestedField,
62-
StringType,
63-
UUIDType,
64-
)
55+
from pyiceberg.transforms import BucketTransform, DayTransform, HourTransform, IdentityTransform, Transform
56+
from pyiceberg.types import DateType, DecimalType, DoubleType, IntegerType, ListType, LongType, NestedField, StringType, UUIDType
6557
from utils import _create_table
6658

6759

@@ -1865,6 +1857,7 @@ def test_read_write_decimals(session_catalog: Catalog) -> None:
18651857
assert tbl.scan().to_arrow() == arrow_table
18661858

18671859

1860+
@pytest.mark.skip("UUID BucketTransform is not supported in Spark Iceberg 1.9.2 yet")
18681861
@pytest.mark.integration
18691862
@pytest.mark.parametrize(
18701863
"transform",
@@ -1918,6 +1911,64 @@ def test_uuid_partitioning(session_catalog: Catalog, spark: SparkSession, transf
19181911
assert lhs == rhs
19191912

19201913

1914+
@pytest.mark.integration_trino
1915+
@pytest.mark.integration
1916+
@pytest.mark.parametrize(
1917+
"transform",
1918+
[IdentityTransform(), BucketTransform(32)],
1919+
)
1920+
@pytest.mark.parametrize(
1921+
"catalog, trino_conn",
1922+
[
1923+
(pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("trino_hive_conn")),
1924+
(pytest.lazy_fixture("session_catalog"), pytest.lazy_fixture("trino_rest_conn")),
1925+
],
1926+
)
1927+
def test_uuid_partitioning_with_trino(catalog: Catalog, trino_conn: Connection, transform: Transform) -> None: # type: ignore
1928+
identifier = f"default.test_uuid_partitioning_{str(transform).replace('[32]', '')}"
1929+
1930+
schema = Schema(NestedField(field_id=1, name="uuid", field_type=UUIDType(), required=True))
1931+
1932+
try:
1933+
catalog.drop_table(identifier=identifier)
1934+
except NoSuchTableError:
1935+
pass
1936+
1937+
partition_spec = PartitionSpec(
1938+
PartitionField(source_id=1, field_id=1000, transform=transform, name=f"uuid_{str(transform).replace('[32]', '')}")
1939+
)
1940+
1941+
import pyarrow as pa
1942+
1943+
arr_table = pa.Table.from_pydict(
1944+
{
1945+
"uuid": [
1946+
uuid.UUID("00000000-0000-0000-0000-000000000000").bytes,
1947+
uuid.UUID("11111111-1111-1111-1111-111111111111").bytes,
1948+
],
1949+
},
1950+
schema=pa.schema(
1951+
[
1952+
# Uuid not yet supported, so we have to stick with `binary(16)`
1953+
# https://github.com/apache/arrow/issues/46468
1954+
pa.field("uuid", pa.binary(16), nullable=False),
1955+
]
1956+
),
1957+
)
1958+
1959+
tbl = catalog.create_table(
1960+
identifier=identifier,
1961+
schema=schema,
1962+
partition_spec=partition_spec,
1963+
)
1964+
1965+
tbl.append(arr_table)
1966+
rows = trino_conn.execute(text(f"SELECT * FROM {identifier}")).fetchall()
1967+
lhs = sorted([r[0] for r in rows])
1968+
rhs = sorted([u.as_py() for u in tbl.scan().to_arrow()["uuid"].combine_chunks()])
1969+
assert lhs == rhs
1970+
1971+
19211972
@pytest.mark.integration
19221973
def test_avro_compression_codecs(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
19231974
identifier = "default.test_avro_compression_codecs"

0 commit comments

Comments
 (0)