Skip to content

Commit fdb090e

Browse files
committed
Add Trino integration tests and update dependencies
- Introduced new tests for registering tables and schema existence in Trino. - Added Trino connection fixtures to support integration testing. - Updated `pyproject.toml` to include the Trino dependency. - Enhanced the Makefile with a new command to run Trino integration tests.
1 parent f194931 commit fdb090e

File tree

7 files changed

+253
-40
lines changed

7 files changed

+253
-40
lines changed

Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@ test-integration-rebuild: ## Rebuild integration Docker services from scratch
103103
docker compose -f dev/docker-compose-integration.yml rm -f
104104
docker compose -f dev/docker-compose-integration.yml build --no-cache
105105

106+
test-integration-trino:
107+
sh ./dev/run-trino.sh
108+
$(TEST_RUNNER) pytest tests/ -m integration_trino $(PYTEST_ARGS)
109+
106110
test-s3: ## Run tests marked with @pytest.mark.s3
107111
sh ./dev/run-minio.sh
108112
$(TEST_RUNNER) pytest tests/ -m s3 $(PYTEST_ARGS)

dev/run-trino.sh

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#!/bin/bash
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
#
20+
21+
set -ex
22+
23+
if [ $(docker ps -q --filter "name=pyiceberg-trino" --filter "status=running" ) ]; then
24+
echo "Trino service running"
25+
else
26+
docker compose -f dev/docker-compose-trino.yml kill
27+
docker compose -f dev/docker-compose-trino.yml up -d
28+
while [ -z $(docker ps -q --filter "name=pyiceberg-trino" --filter "status=running" ) ]
29+
do
30+
echo "Waiting for Trino"
31+
sleep 1
32+
done
33+
fi

poetry.lock

Lines changed: 128 additions & 25 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ deptry = ">=0.14,<0.24"
104104
docutils = "!=0.21.post1" # https://github.com/python-poetry/poetry/issues/9248#issuecomment-2026240520
105105
mypy-boto3-glue = ">=1.28.18"
106106
mypy-boto3-dynamodb = ">=1.28.18"
107+
trino = {extras = ["sqlalchemy"], version = "^0.335.0"}
107108

108109
[tool.poetry.group.docs.dependencies]
109110
# for mkdocs
@@ -326,6 +327,7 @@ markers = [
326327
"s3: marks a test as requiring access to s3 compliant storage (use with --aws-access-key-id, --aws-secret-access-key, and --endpoint args)",
327328
"adls: marks a test as requiring access to adls compliant storage (use with --adls.account-name, --adls.account-key, and --adls.endpoint args)",
328329
"integration: marks integration tests against Apache Spark",
330+
"integration_trino: marks integration tests against Trino",
329331
"gcs: marks a test as requiring access to gcs compliant storage (use with --gs.token, --gs.project, and --gs.endpoint)",
330332
"benchmark: collection of tests to validate read/write performance before and after a change"
331333
]

tests/conftest.py

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,12 @@
3535
from pathlib import Path
3636
from random import choice, randint
3737
from tempfile import TemporaryDirectory
38-
from typing import (
39-
TYPE_CHECKING,
40-
Any,
41-
Dict,
42-
Generator,
43-
List,
44-
Optional,
45-
)
38+
from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional
4639

4740
import boto3
4841
import pytest
4942
from moto import mock_aws
43+
from sqlalchemy import Connection
5044

5145
from pyiceberg.catalog import Catalog, load_catalog
5246
from pyiceberg.catalog.noop import NoopCatalog
@@ -141,6 +135,18 @@ def pytest_addoption(parser: pytest.Parser) -> None:
141135
"--gcs.oauth2.token", action="store", default="anon", help="The GCS authentication method for tests marked gcs"
142136
)
143137
parser.addoption("--gcs.project-id", action="store", default="test", help="The GCP project for tests marked gcs")
138+
parser.addoption(
139+
"--trino.rest.endpoint",
140+
action="store",
141+
default="trino://test@localhost:8082/warehouse_rest",
142+
help="The Trino REST endpoint URL for tests marked as integration_trino",
143+
)
144+
parser.addoption(
145+
"--trino.hive.endpoint",
146+
action="store",
147+
default="trino://test@localhost:8082/warehouse_hive",
148+
help="The Trino Hive endpoint URL for tests marked as integration_trino",
149+
)
144150

145151

146152
@pytest.fixture(scope="session")
@@ -2436,6 +2442,28 @@ def bound_reference_uuid() -> BoundReference[str]:
24362442
return BoundReference(field=NestedField(1, "field", UUIDType(), required=False), accessor=Accessor(position=0, inner=None))
24372443

24382444

2445+
@pytest.fixture(scope="session")
2446+
def trino_hive_conn(request: pytest.FixtureRequest) -> Generator[Connection, None, None]:
2447+
from sqlalchemy import create_engine
2448+
2449+
trino_endpoint = request.config.getoption("--trino.hive.endpoint")
2450+
engine = create_engine(trino_endpoint)
2451+
connection = engine.connect()
2452+
yield connection
2453+
connection.close()
2454+
2455+
2456+
@pytest.fixture(scope="session")
2457+
def trino_rest_conn(request: pytest.FixtureRequest) -> Generator[Connection, None, None]:
2458+
from sqlalchemy import create_engine
2459+
2460+
trino_endpoint = request.config.getoption("--trino.rest.endpoint")
2461+
engine = create_engine(trino_endpoint)
2462+
connection = engine.connect()
2463+
yield connection
2464+
connection.close()
2465+
2466+
24392467
@pytest.fixture(scope="session")
24402468
def session_catalog() -> Catalog:
24412469
return load_catalog(

tests/integration/test_register_table.py

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,14 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717
import pytest
18+
from sqlalchemy import Connection, inspect
1819

1920
from pyiceberg.catalog import Catalog
2021
from pyiceberg.exceptions import NoSuchTableError, TableAlreadyExistsError
2122
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
2223
from pyiceberg.schema import Schema
2324
from pyiceberg.table import Table
24-
from pyiceberg.types import (
25-
BooleanType,
26-
DateType,
27-
IntegerType,
28-
NestedField,
29-
StringType,
30-
)
25+
from pyiceberg.types import BooleanType, DateType, IntegerType, NestedField, StringType
3126

3227
TABLE_SCHEMA = Schema(
3328
NestedField(field_id=1, name="foo", field_type=BooleanType(), required=False),
@@ -86,3 +81,32 @@ def test_register_table_existing(
8681
# Assert that registering the table again raises TableAlreadyExistsError
8782
with pytest.raises(TableAlreadyExistsError):
8883
catalog.register_table(("default", "register_table_existing"), metadata_location=tbl.metadata_location)
84+
85+
86+
@pytest.mark.integration_trino
87+
@pytest.mark.parametrize(
88+
"catalog, trino_conn",
89+
[
90+
(pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("trino_hive_conn")),
91+
(pytest.lazy_fixture("session_catalog"), pytest.lazy_fixture("trino_rest_conn")),
92+
],
93+
)
94+
def test_register_table_existing_in_trino(
95+
catalog: Catalog,
96+
trino_conn: Connection,
97+
) -> None:
98+
"""Test the registration of a table in the catalog that already exists in Trino.
99+
This test verifies that a table can be registered in the catalog with an existing
100+
metadata location and properly reflected in Trino.
101+
"""
102+
identifier = "default.register_table_trino"
103+
location = "s3a://warehouse/default/register_table_trino"
104+
tbl = _create_table(catalog, identifier, 2, location)
105+
assert catalog.table_exists(identifier=identifier)
106+
assert "register_table_trino" in inspect(trino_conn).get_table_names("default")
107+
catalog.drop_table(identifier=identifier)
108+
assert not catalog.table_exists(identifier=identifier)
109+
assert "register_table_trino" not in inspect(trino_conn).get_table_names("default")
110+
catalog.register_table(("default", "register_table_trino"), metadata_location=tbl.metadata_location)
111+
assert catalog.table_exists(identifier=identifier)
112+
assert "register_table_trino" in inspect(trino_conn).get_table_names("default")

tests/integration/test_rest_catalog.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
# pylint:disable=redefined-outer-name
1818

1919
import pytest
20+
from sqlalchemy import Connection
2021

2122
from pyiceberg.catalog.rest import RestCatalog
2223

@@ -61,3 +62,21 @@ def test_create_namespace_if_already_existing(catalog: RestCatalog) -> None:
6162
catalog.create_namespace_if_not_exists(TEST_NAMESPACE_IDENTIFIER)
6263

6364
assert catalog.namespace_exists(TEST_NAMESPACE_IDENTIFIER)
65+
66+
67+
@pytest.mark.integration_trino
68+
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")])
69+
def test_schema_exists_in_trino(trino_rest_conn: Connection, catalog: RestCatalog) -> None:
70+
"""Verifies that an Iceberg namespace correctly appears as a schema in Trino.
71+
This test ensures the synchronization between Iceberg's namespace concept and
72+
Trino's schema concept, confirming that after creating a namespace in the Iceberg
73+
catalog, it becomes visible as a schema in the Trino environment.
74+
"""
75+
from sqlalchemy import inspect
76+
77+
if not catalog.namespace_exists(TEST_NAMESPACE_IDENTIFIER):
78+
catalog.create_namespace(TEST_NAMESPACE_IDENTIFIER)
79+
catalog.create_namespace_if_not_exists(TEST_NAMESPACE_IDENTIFIER)
80+
81+
assert catalog.namespace_exists(TEST_NAMESPACE_IDENTIFIER)
82+
assert TEST_NAMESPACE_IDENTIFIER.lower() in inspect(trino_rest_conn).get_schema_names()

0 commit comments

Comments
 (0)