Skip to content

feat: add support for hive 4.0.1 #2217

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions dev/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ WORKDIR ${SPARK_HOME}
ENV SPARK_VERSION=3.5.6
ENV ICEBERG_SPARK_RUNTIME_VERSION=3.5_2.12
ENV ICEBERG_VERSION=1.9.1
ENV PYICEBERG_VERSION=0.9.1

RUN curl --retry 5 -s -C - https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \
&& tar xzf spark-${SPARK_VERSION}-bin-hadoop3.tgz --directory /opt/spark --strip-components 1 \
Expand All @@ -55,18 +54,30 @@ RUN curl --retry 5 -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-
RUN curl --retry 5 -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar \
-Lo /opt/spark/jars/iceberg-aws-bundle-${ICEBERG_VERSION}.jar

COPY spark-defaults.conf /opt/spark/conf
COPY dev/spark-defaults.conf /opt/spark/conf
ENV PATH="/opt/spark/sbin:/opt/spark/bin:${PATH}"

RUN chmod u+x /opt/spark/sbin/* && \
chmod u+x /opt/spark/bin/*

RUN pip3 install -q ipython

RUN pip3 install "pyiceberg[s3fs,hive,pyarrow]==${PYICEBERG_VERSION}"
# Copy the local pyiceberg source code and install locally
COPY pyiceberg/ /tmp/pyiceberg/pyiceberg
COPY pyproject.toml /tmp/pyiceberg/
COPY build-module.py /tmp/pyiceberg/
COPY vendor/ /tmp/pyiceberg/vendor
COPY README.md /tmp/pyiceberg/
COPY NOTICE /tmp/pyiceberg/

COPY entrypoint.sh .
COPY provision.py .
# Install pyiceberg from the copied source
RUN cd /tmp/pyiceberg && pip3 install ".[s3fs,hive,pyarrow]"

# Clean up
RUN rm -rf /tmp/pyiceberg

COPY dev/entrypoint.sh ${SPARK_HOME}/
COPY dev/provision.py ${SPARK_HOME}/

ENTRYPOINT ["./entrypoint.sh"]
CMD ["notebook"]
4 changes: 3 additions & 1 deletion dev/docker-compose-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ services:
spark-iceberg:
image: python-integration
container_name: pyiceberg-spark
build: .
build:
context: ..
dockerfile: dev/Dockerfile
networks:
iceberg_net:
depends_on:
Expand Down
2 changes: 1 addition & 1 deletion dev/hive/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ ENV AWS_SDK_BUNDLE=1.12.753
RUN curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar -Lo /tmp/hadoop-aws-${HADOOP_VERSION}.jar
RUN curl https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_BUNDLE}/aws-java-sdk-bundle-${AWS_SDK_BUNDLE}.jar -Lo /tmp/aws-java-sdk-bundle-${AWS_SDK_BUNDLE}.jar

FROM apache/hive:4.0.0
FROM apache/hive:4.0.1

ENV HADOOP_VERSION=3.3.6
ENV AWS_SDK_BUNDLE=1.12.753
Expand Down
34 changes: 26 additions & 8 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
CheckLockRequest,
EnvironmentContext,
FieldSchema,
GetTableRequest,
GetTableResult,
GetTablesRequest,
GetTablesResult,
InvalidOperationException,
LockComponent,
LockLevel,
Expand Down Expand Up @@ -389,8 +393,11 @@ def _create_hive_table(self, open_client: Client, hive_table: HiveTable) -> None

def _get_hive_table(self, open_client: Client, database_name: str, table_name: str) -> HiveTable:
try:
return open_client.get_table(dbname=database_name, tbl_name=table_name)
except NoSuchObjectException as e:
get_table_result: GetTableResult = open_client.get_table_req(
req=GetTableRequest(dbName=database_name, tblName=table_name)
)
return get_table_result.table
except IndexError as e:
raise NoSuchTableError(f"Table does not exists: {table_name}") from e

def create_table(
Expand Down Expand Up @@ -435,7 +442,10 @@ def create_table(

with self._client as open_client:
self._create_hive_table(open_client, tbl)
hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name)
try:
hive_table = self._get_hive_table(open_client, database_name, table_name)
except IndexError as e:
raise NoSuchObjectException("get_table failed: unknown result") from e

return self._convert_hive_into_iceberg(hive_table)

Expand Down Expand Up @@ -465,7 +475,10 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
tbl = self._convert_iceberg_into_hive(staged_table)
with self._client as open_client:
self._create_hive_table(open_client, tbl)
hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name)
try:
hive_table = self._get_hive_table(open_client, database_name, table_name)
except IndexError as e:
raise NoSuchObjectException("get_table failed: unknown result") from e

return self._convert_hive_into_iceberg(hive_table)

Expand Down Expand Up @@ -656,7 +669,10 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier)
try:
with self._client as open_client:
tbl = open_client.get_table(dbname=from_database_name, tbl_name=from_table_name)
try:
tbl = self._get_hive_table(open_client, from_database_name, from_table_name)
except IndexError as e:
raise NoSuchObjectException("get_table failed: unknown result") from e
tbl.dbName = to_database_name
tbl.tableName = to_table_name
open_client.alter_table_with_environment_context(
Expand Down Expand Up @@ -726,11 +742,13 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
"""
database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
with self._client as open_client:
all_table_names = open_client.get_all_tables(db_name=database_name)
get_tables_result: GetTablesResult = open_client.get_table_objects_by_name_req(
req=GetTablesRequest(dbName=database_name, tblNames=all_table_names)
)
return [
(database_name, table.tableName)
for table in open_client.get_table_objects_by_name(
dbname=database_name, tbl_names=open_client.get_all_tables(db_name=database_name)
)
for table in get_tables_result.tables
if table.parameters.get(TABLE_TYPE, "").lower() == ICEBERG
]

Expand Down
54 changes: 35 additions & 19 deletions tests/catalog/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
AlreadyExistsException,
EnvironmentContext,
FieldSchema,
GetTableRequest,
GetTableResult,
GetTablesRequest,
GetTablesResult,
InvalidOperationException,
LockResponse,
LockState,
Expand Down Expand Up @@ -280,7 +284,7 @@ def test_create_table(

catalog._client = MagicMock()
catalog._client.__enter__().create_table.return_value = None
catalog._client.__enter__().get_table.return_value = hive_table
catalog._client.__enter__().get_table_req.return_value = GetTableResult(table=hive_table)
catalog._client.__enter__().get_database.return_value = hive_database
catalog.create_table(("default", "table"), schema=table_schema_with_all_types, properties={"owner": "javaberg"})

Expand Down Expand Up @@ -459,7 +463,7 @@ def test_create_table_with_given_location_removes_trailing_slash(

catalog._client = MagicMock()
catalog._client.__enter__().create_table.return_value = None
catalog._client.__enter__().get_table.return_value = hive_table
catalog._client.__enter__().get_table_req.return_value = GetTableResult(table=hive_table)
catalog._client.__enter__().get_database.return_value = hive_database
catalog.create_table(
("default", "table"), schema=table_schema_with_all_types, properties={"owner": "javaberg"}, location=f"{location}/"
Expand Down Expand Up @@ -633,7 +637,7 @@ def test_create_v1_table(table_schema_simple: Schema, hive_database: HiveDatabas

catalog._client = MagicMock()
catalog._client.__enter__().create_table.return_value = None
catalog._client.__enter__().get_table.return_value = hive_table
catalog._client.__enter__().get_table_req.return_value = GetTableResult(table=hive_table)
catalog._client.__enter__().get_database.return_value = hive_database
catalog.create_table(
("default", "table"), schema=table_schema_simple, properties={"owner": "javaberg", "format-version": "1"}
Expand Down Expand Up @@ -684,10 +688,10 @@ def test_load_table(hive_table: HiveTable) -> None:
catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)

catalog._client = MagicMock()
catalog._client.__enter__().get_table.return_value = hive_table
catalog._client.__enter__().get_table_req.return_value = GetTableResult(table=hive_table)
table = catalog.load_table(("default", "new_tabl2e"))

catalog._client.__enter__().get_table.assert_called_with(dbname="default", tbl_name="new_tabl2e")
catalog._client.__enter__().get_table_req.assert_called_with(req=GetTableRequest(dbName="default", tblName="new_tabl2e"))

expected = TableMetadataV2(
location="s3://bucket/test/location",
Expand Down Expand Up @@ -784,11 +788,11 @@ def test_load_table_from_self_identifier(hive_table: HiveTable) -> None:
catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)

catalog._client = MagicMock()
catalog._client.__enter__().get_table.return_value = hive_table
catalog._client.__enter__().get_table_req.return_value = GetTableResult(table=hive_table)
intermediate = catalog.load_table(("default", "new_tabl2e"))
table = catalog.load_table(intermediate.name())

catalog._client.__enter__().get_table.assert_called_with(dbname="default", tbl_name="new_tabl2e")
catalog._client.__enter__().get_table_req.assert_called_with(req=GetTableRequest(dbName="default", tblName="new_tabl2e"))

expected = TableMetadataV2(
location="s3://bucket/test/location",
Expand Down Expand Up @@ -889,7 +893,10 @@ def test_rename_table(hive_table: HiveTable) -> None:
renamed_table.tableName = "new_tabl3e"

catalog._client = MagicMock()
catalog._client.__enter__().get_table.side_effect = [hive_table, renamed_table]
catalog._client.__enter__().get_table_req.side_effect = [
GetTableResult(table=hive_table),
GetTableResult(table=renamed_table),
]
catalog._client.__enter__().alter_table_with_environment_context.return_value = None

from_identifier = ("default", "new_tabl2e")
Expand All @@ -898,8 +905,11 @@ def test_rename_table(hive_table: HiveTable) -> None:

assert table.name() == to_identifier

calls = [call(dbname="default", tbl_name="new_tabl2e"), call(dbname="default", tbl_name="new_tabl3e")]
catalog._client.__enter__().get_table.assert_has_calls(calls)
expected_calls = [
call(req=GetTableRequest(dbName="default", tblName="new_tabl2e")),
call(req=GetTableRequest(dbName="default", tblName="new_tabl3e")),
]
catalog._client.__enter__().get_table_req.assert_has_calls(expected_calls)
catalog._client.__enter__().alter_table_with_environment_context.assert_called_with(
dbname="default",
tbl_name="new_tabl2e",
Expand All @@ -912,25 +922,31 @@ def test_rename_table_from_self_identifier(hive_table: HiveTable) -> None:
catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)

catalog._client = MagicMock()
catalog._client.__enter__().get_table.return_value = hive_table
catalog._client.__enter__().get_table_req.return_value = GetTableResult(table=hive_table)

from_identifier = ("default", "new_tabl2e")
from_table = catalog.load_table(from_identifier)
catalog._client.__enter__().get_table.assert_called_with(dbname="default", tbl_name="new_tabl2e")
catalog._client.__enter__().get_table_req.assert_called_with(req=GetTableRequest(dbName="default", tblName="new_tabl2e"))

renamed_table = copy.deepcopy(hive_table)
renamed_table.dbName = "default"
renamed_table.tableName = "new_tabl3e"

catalog._client.__enter__().get_table.side_effect = [hive_table, renamed_table]
catalog._client.__enter__().get_table_req.side_effect = [
GetTableResult(table=hive_table),
GetTableResult(table=renamed_table),
]
catalog._client.__enter__().alter_table_with_environment_context.return_value = None
to_identifier = ("default", "new_tabl3e")
table = catalog.rename_table(from_table.name(), to_identifier)

assert table.name() == to_identifier

calls = [call(dbname="default", tbl_name="new_tabl2e"), call(dbname="default", tbl_name="new_tabl3e")]
catalog._client.__enter__().get_table.assert_has_calls(calls)
expected_calls = [
call(req=GetTableRequest(dbName="default", tblName="new_tabl2e")),
call(req=GetTableRequest(dbName="default", tblName="new_tabl3e")),
]
catalog._client.__enter__().get_table_req.assert_has_calls(expected_calls)
catalog._client.__enter__().alter_table_with_environment_context.assert_called_with(
dbname="default",
tbl_name="new_tabl2e",
Expand Down Expand Up @@ -1013,13 +1029,13 @@ def test_list_tables(hive_table: HiveTable) -> None:

catalog._client = MagicMock()
catalog._client.__enter__().get_all_tables.return_value = ["table1", "table2", "table3", "table4"]
catalog._client.__enter__().get_table_objects_by_name.return_value = [tbl1, tbl2, tbl3, tbl4]
catalog._client.__enter__().get_table_objects_by_name_req.return_value = GetTablesResult(tables=[tbl1, tbl2, tbl3, tbl4])

got_tables = catalog.list_tables("database")
assert got_tables == [("database", "table1"), ("database", "table2")]
catalog._client.__enter__().get_all_tables.assert_called_with(db_name="database")
catalog._client.__enter__().get_table_objects_by_name.assert_called_with(
dbname="database", tbl_names=["table1", "table2", "table3", "table4"]
catalog._client.__enter__().get_table_objects_by_name_req.assert_called_with(
req=GetTablesRequest(dbName="database", tblNames=["table1", "table2", "table3", "table4"])
)


Expand Down Expand Up @@ -1049,7 +1065,7 @@ def test_drop_table_from_self_identifier(hive_table: HiveTable) -> None:
catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)

catalog._client = MagicMock()
catalog._client.__enter__().get_table.return_value = hive_table
catalog._client.__enter__().get_table_req.return_value = GetTableResult(table=hive_table)
table = catalog.load_table(("default", "new_tabl2e"))

catalog._client.__enter__().get_all_databases.return_value = ["namespace1", "namespace2"]
Expand Down
40 changes: 40 additions & 0 deletions vendor/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Makefile for generating vendor packages

.PHONY: all clean fb303 hive-metastore

all: fb303 hive-metastore

# FB303 Thrift client generation
fb303:
rm -f /tmp/fb303.thrift
rm -rf fb303
curl -s https://raw.githubusercontent.com/apache/thrift/master/contrib/fb303/if/fb303.thrift > /tmp/fb303.thrift
rm -rf /tmp/gen-py/
thrift -gen py -o /tmp/ /tmp/fb303.thrift
mv /tmp/gen-py/fb303 fb303

# Hive Metastore Thrift definition generation
hive-metastore:
rm -rf /tmp/hive
mkdir -p /tmp/hive/share/fb303/if/
curl -s https://raw.githubusercontent.com/apache/thrift/master/contrib/fb303/if/fb303.thrift > /tmp/hive/share/fb303/if/fb303.thrift
curl -s https://raw.githubusercontent.com/apache/hive/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift > /tmp/hive/hive_metastore.thrift
thrift -gen py -o /tmp/hive /tmp/hive/hive_metastore.thrift
rm -rf hive_metastore
mv /tmp/hive/gen-py/hive_metastore hive_metastore
40 changes: 27 additions & 13 deletions vendor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,42 @@

Some packages we want to maintain in the repository itself, because there is no good 3rd party alternative.

## FB303 Thrift client
## Quick Setup

Generate all vendor packages:

```bash
make all
```

Generate individual packages:

```bash
make fb303 # FB303 Thrift client only
make hive-metastore # Hive Metastore Thrift definitions only
```

## Packages

### FB303 Thrift client

fb303 is a base Thrift service and a common set of functionality for querying stats, options, and other information from a service.

**Generate with Make:**
```bash
rm -f /tmp/fb303.thrift
rm -rf fb303
curl -s https://raw.githubusercontent.com/apache/thrift/master/contrib/fb303/if/fb303.thrift > /tmp/fb303.thrift
rm -rf /tmp/gen-py/
thrift -gen py -o /tmp/ /tmp/fb303.thrift
mv /tmp/gen-py/fb303 fb303
make fb303
```

# Hive Metastore Thrift definition

The thrift definition require the fb303 service as a dependency

**Generate with Make:**
```bash
rm -rf /tmp/hive
mkdir -p /tmp/hive/share/fb303/if/
curl -s https://raw.githubusercontent.com/apache/thrift/master/contrib/fb303/if/fb303.thrift > /tmp/hive/share/fb303/if/fb303.thrift
curl -s https://raw.githubusercontent.com/apache/hive/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift > /tmp/hive/hive_metastore.thrift
thrift -gen py -o /tmp/hive /tmp/hive/hive_metastore.thrift
mv /tmp/hive/gen-py/hive_metastore hive_metastore
make hive-metastore
```

## Requirements

- Apache Thrift compiler (`thrift`)
- `curl` for downloading Thrift definitions
Loading
Loading