From d2a2f574efc315a46624cda2244b63d4484c8f7d Mon Sep 17 00:00:00 2001 From: Peng-Jui Wang Date: Tue, 15 Jul 2025 00:22:04 -0700 Subject: [PATCH] add storage_options support for custom aiobotocore sessions --- mkdocs/docs/configuration.md | 1 + pyiceberg/io/__init__.py | 1 + pyiceberg/io/fsspec.py | 9 ++++++++- tests/io/test_fsspec.py | 29 +++++++++++++++++++++++++++++ 4 files changed, 39 insertions(+), 1 deletion(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index bc514e39af..3916666709 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -127,6 +127,7 @@ For the FileIO there are several configuration options available: | s3.request-timeout | 60.0 | Configure socket read timeouts on Windows and macOS, in seconds. | | s3.force-virtual-addressing | False | Whether to use virtual addressing of buckets. If true, then virtual addressing is always enabled. If false, then virtual addressing is only enabled if endpoint_override is empty. This can be used for non-AWS backends that only support virtual hosted-style access. | | s3.retry-strategy-impl | None | Ability to set a custom S3 retry strategy. A full path to a class needs to be given that extends the [S3RetryStrategy](https://github.com/apache/arrow/blob/639201bfa412db26ce45e73851432018af6c945e/python/pyarrow/_s3fs.pyx#L110) base class. | +| storage_options | {"session": } | Configure storage options including custom aiobotocore session and other s3fs parameters. Only implemented for `FsspecFileIO`. | diff --git a/pyiceberg/io/__init__.py b/pyiceberg/io/__init__.py index 0b03a7dd4f..12fc914611 100644 --- a/pyiceberg/io/__init__.py +++ b/pyiceberg/io/__init__.py @@ -69,6 +69,7 @@ S3_ROLE_SESSION_NAME = "s3.role-session-name" S3_FORCE_VIRTUAL_ADDRESSING = "s3.force-virtual-addressing" S3_RETRY_STRATEGY_IMPL = "s3.retry-strategy-impl" +STORAGE_OPTIONS = "storage_options" HDFS_HOST = "hdfs.host" HDFS_PORT = "hdfs.port" HDFS_USER = "hdfs.user" diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py index 6febff0ae6..51ad8301d5 100644 --- a/pyiceberg/io/fsspec.py +++ b/pyiceberg/io/fsspec.py @@ -72,6 +72,7 @@ S3_SECRET_ACCESS_KEY, S3_SESSION_TOKEN, S3_SIGNER, + STORAGE_OPTIONS, S3_SIGNER_ENDPOINT, S3_SIGNER_ENDPOINT_DEFAULT, S3_SIGNER_URI, @@ -163,7 +164,13 @@ def _s3(properties: Properties) -> AbstractFileSystem: if request_timeout := properties.get(S3_REQUEST_TIMEOUT): config_kwargs["read_timeout"] = float(request_timeout) - fs = S3FileSystem(client_kwargs=client_kwargs, config_kwargs=config_kwargs) + s3_kwargs = {"client_kwargs": client_kwargs, "config_kwargs": config_kwargs} + + if storage_options := properties.get(STORAGE_OPTIONS): + if isinstance(storage_options, dict): + s3_kwargs.update(storage_options) + + fs = S3FileSystem(**s3_kwargs) for event_name, event_function in register_events.items(): fs.s3.meta.events.unregister(event_name, unique_id=1925) diff --git a/tests/io/test_fsspec.py b/tests/io/test_fsspec.py index 854172727d..46e9af2eee 100644 --- a/tests/io/test_fsspec.py +++ b/tests/io/test_fsspec.py @@ -20,6 +20,7 @@ import tempfile import uuid from unittest import mock +from unittest.mock import Mock import pytest from botocore.awsrequest import AWSRequest @@ -290,6 +291,34 @@ def test_fsspec_unified_session_properties() -> None: ) +def test_fsspec_s3_storage_options_session() -> None: + mock_session = Mock() + + session_properties: Properties = { + "s3.endpoint": "http://localhost:9000", + "storage_options": {"session": mock_session}, + **UNIFIED_AWS_SESSION_PROPERTIES, + } + + with mock.patch("s3fs.S3FileSystem") as mock_s3fs: + s3_fileio = FsspecFileIO(properties=session_properties) + filename = str(uuid.uuid4()) + + s3_fileio.new_input(location=f"s3://warehouse/{filename}") + + mock_s3fs.assert_called_with( + client_kwargs={ + "endpoint_url": "http://localhost:9000", + "aws_access_key_id": "client.access-key-id", + "aws_secret_access_key": "client.secret-access-key", + "region_name": "client.region", + "aws_session_token": "client.session-token", + }, + config_kwargs={}, + session=mock_session, + ) + + @pytest.mark.adls def test_fsspec_new_input_file_adls(adls_fsspec_fileio: FsspecFileIO) -> None: """Test creating a new input file from an fsspec file-io"""