From c531e0fbfd185a141a4aeb9d6e5f475d8f400bae Mon Sep 17 00:00:00 2001 From: geruh Date: Thu, 24 Jul 2025 18:27:46 -0700 Subject: [PATCH 01/21] allow passing pyarrow fs props --- mkdocs/docs/configuration.md | 27 ++++ pyiceberg/io/pyarrow.py | 282 ++++++++++++++++++++++++++--------- tests/io/test_pyarrow.py | 74 +++++++++ 3 files changed, 316 insertions(+), 67 deletions(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index f4fbe0c8d8..2f71ddbd4f 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -105,6 +105,33 @@ You can also set the FileIO explicitly: For the FileIO there are several configuration options available: +### PyArrow FileSystem Extra Properties + +When using `PyArrowFileIO`, any properties with filesystem specific prefixes that are not explicitly handled by PyIceberg will be passed to the underlying PyArrow filesystem implementations. + +To use these properties, follow the format: + +```txt +{fs_scheme}.{parameter_name} +``` + +- {fs_scheme} is the filesystem scheme (e.g., s3, hdfs, gcs). +- {parameter_name} must match the name expected by the PyArrow filesystem. +- Property values must use the correct type expected by the underlying filesystem (e.g., string, integer, boolean). + +Below are examples of supported prefixes and how the properties are passed through: + + + +| Property Prefix | FileSystem | Example | Description | +|-----------------|------------------------------------------------------------------------------------------------------|-----------------------------|-----------------------------------------------------| +| `s3.` | [S3FileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.S3FileSystem.html) | `s3.load_frequency=900` | Passed as `load_frequency=900` to S3FileSystem | +| `hdfs.` | [HadoopFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.HadoopFileSystem.html) | `hdfs.replication=3` | Passed as `replication=3` to HadoopFileSystem | +| `gcs.` | [GcsFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.GcsFileSystem.html) | `gcs.project_id=test` | Passed as `project_id='test'` to GcsFileSystem | +| `adls.` | [AzureFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.AzureFileSystem.html) | `adls.blob_cache-size=1024` | Passed as `blob_cache_size=1024` to AzureFileSystem | +| `oss.` | [S3FileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.S3FileSystem.html) | `oss.connect_timeout=30.0` | Passed as `connect_timeout=30.0` to S3FileSystem | +| `file.` | [LocalFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.LocalFileSystem.html) | `file.use_mmap=true` | Passed as `use_mmap=True` to LocalFileSystem | + ### S3 diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 2797371028..aa0f2a822e 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -423,29 +423,67 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste def _initialize_oss_fs(self) -> FileSystem: from pyarrow.fs import S3FileSystem - client_kwargs: Dict[str, Any] = { - "endpoint_override": self.properties.get(S3_ENDPOINT), - "access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), - "secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), - "session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN), - "region": get_first_property_value(self.properties, S3_REGION, AWS_REGION), - "force_virtual_addressing": property_as_bool(self.properties, S3_FORCE_VIRTUAL_ADDRESSING, True), + # Mapping from PyIceberg properties to S3FileSystem parameter names + property_mapping = { + S3_ENDPOINT: "endpoint_override", + S3_ACCESS_KEY_ID: "access_key", + AWS_ACCESS_KEY_ID: "access_key", + S3_SECRET_ACCESS_KEY: "secret_key", + AWS_SECRET_ACCESS_KEY: "secret_key", + S3_SESSION_TOKEN: "session_token", + AWS_SESSION_TOKEN: "session_token", + S3_REGION: "region", + AWS_REGION: "region", + S3_PROXY_URI: "proxy_options", + S3_CONNECT_TIMEOUT: "connect_timeout", + S3_REQUEST_TIMEOUT: "request_timeout", + S3_FORCE_VIRTUAL_ADDRESSING: "force_virtual_addressing", } - if proxy_uri := self.properties.get(S3_PROXY_URI): - client_kwargs["proxy_options"] = proxy_uri + # Properties that need special handling + special_properties = { + S3_CONNECT_TIMEOUT, + S3_REQUEST_TIMEOUT, + S3_FORCE_VIRTUAL_ADDRESSING, + S3_ROLE_SESSION_NAME, + S3_RESOLVE_REGION, + AWS_ROLE_SESSION_NAME, + } - if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT): - client_kwargs["connect_timeout"] = float(connect_timeout) + client_kwargs: Dict[str, Any] = {} - if request_timeout := self.properties.get(S3_REQUEST_TIMEOUT): - client_kwargs["request_timeout"] = float(request_timeout) + for prop_name, prop_value in self.properties.items(): + if prop_value is None: + continue + + # Skip properties that need special handling + if prop_name in special_properties: + continue - if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN): - client_kwargs["role_arn"] = role_arn + # Map known property names to S3FileSystem parameter names + if prop_name in property_mapping: + param_name = property_mapping[prop_name] + client_kwargs[param_name] = prop_value - if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME): - client_kwargs["session_name"] = session_name + # Pass through any other s3.* properties to S3FileSystem + elif prop_name.startswith("s3."): + param_name = prop_name.split(".", 1)[1] + client_kwargs[param_name] = prop_value + + # Handle properties that need first value resolution + if S3_ACCESS_KEY_ID in self.properties or AWS_ACCESS_KEY_ID in self.properties: + client_kwargs["access_key"] = get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID) + + if S3_SECRET_ACCESS_KEY in self.properties or AWS_SECRET_ACCESS_KEY in self.properties: + client_kwargs["secret_key"] = get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY) + + if S3_SESSION_TOKEN in self.properties or AWS_SESSION_TOKEN in self.properties: + client_kwargs["session_token"] = get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN) + + if S3_REGION in self.properties or AWS_REGION in self.properties: + client_kwargs["region"] = get_first_property_value(self.properties, S3_REGION, AWS_REGION) + + client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, S3_FORCE_VIRTUAL_ADDRESSING, True) return S3FileSystem(**client_kwargs) @@ -467,16 +505,71 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem: else: bucket_region = provided_region - client_kwargs: Dict[str, Any] = { - "endpoint_override": self.properties.get(S3_ENDPOINT), - "access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), - "secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), - "session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN), - "region": bucket_region, + # Properties that need special handling + property_mapping = { + S3_ENDPOINT: "endpoint_override", + S3_ACCESS_KEY_ID: "access_key", + AWS_ACCESS_KEY_ID: "access_key", + S3_SECRET_ACCESS_KEY: "secret_key", + AWS_SECRET_ACCESS_KEY: "secret_key", + S3_SESSION_TOKEN: "session_token", + AWS_SESSION_TOKEN: "session_token", + S3_PROXY_URI: "proxy_options", + S3_CONNECT_TIMEOUT: "connect_timeout", + S3_REQUEST_TIMEOUT: "request_timeout", + S3_ROLE_ARN: "role_arn", + AWS_ROLE_ARN: "role_arn", + S3_ROLE_SESSION_NAME: "session_name", + AWS_ROLE_SESSION_NAME: "session_name", + S3_FORCE_VIRTUAL_ADDRESSING: "force_virtual_addressing", + S3_RETRY_STRATEGY_IMPL: "retry_strategy", + } + + # Properties that need special handling and should not be passed directly + special_properties = { + S3_RESOLVE_REGION, + S3_REGION, + AWS_REGION, + S3_RETRY_STRATEGY_IMPL, + S3_CONNECT_TIMEOUT, + S3_REQUEST_TIMEOUT, } - if proxy_uri := self.properties.get(S3_PROXY_URI): - client_kwargs["proxy_options"] = proxy_uri + client_kwargs: Dict[str, Any] = {} + + client_kwargs["region"] = bucket_region + for prop_name, prop_value in self.properties.items(): + if prop_value is None: + continue + + # Skip properties that need special handling + if prop_name in special_properties: + continue + + if prop_name in property_mapping: + param_name = property_mapping[prop_name] + client_kwargs[param_name] = prop_value + + # Pass through any other s3.* properties that might be used by S3FileSystem + elif prop_name.startswith("s3."): + param_name = prop_name.split(".", 1)[1] + client_kwargs[param_name] = prop_value + + # Handle properties that need first value resolution + if S3_ACCESS_KEY_ID in self.properties or AWS_ACCESS_KEY_ID in self.properties: + client_kwargs["access_key"] = get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID) + + if S3_SECRET_ACCESS_KEY in self.properties or AWS_SECRET_ACCESS_KEY in self.properties: + client_kwargs["secret_key"] = get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY) + + if S3_SESSION_TOKEN in self.properties or AWS_SESSION_TOKEN in self.properties: + client_kwargs["session_token"] = get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN) + + if S3_ROLE_ARN in self.properties or AWS_ROLE_ARN in self.properties: + client_kwargs["role_arn"] = get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN) + + if S3_ROLE_SESSION_NAME in self.properties or AWS_ROLE_SESSION_NAME in self.properties: + client_kwargs["session_name"] = get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME) if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT): client_kwargs["connect_timeout"] = float(connect_timeout) @@ -484,15 +577,7 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem: if request_timeout := self.properties.get(S3_REQUEST_TIMEOUT): client_kwargs["request_timeout"] = float(request_timeout) - if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN): - client_kwargs["role_arn"] = role_arn - - if session_name := get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME): - client_kwargs["session_name"] = session_name - - if self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING) is not None: - client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, S3_FORCE_VIRTUAL_ADDRESSING, False) - + # Handle retry strategy special case if (retry_strategy_impl := self.properties.get(S3_RETRY_STRATEGY_IMPL)) and ( retry_instance := _import_retry_strategy(retry_strategy_impl) ): @@ -512,59 +597,111 @@ def _initialize_azure_fs(self) -> FileSystem: from pyarrow.fs import AzureFileSystem - client_kwargs: Dict[str, str] = {} - - if account_name := self.properties.get(ADLS_ACCOUNT_NAME): - client_kwargs["account_name"] = account_name - - if account_key := self.properties.get(ADLS_ACCOUNT_KEY): - client_kwargs["account_key"] = account_key - - if blob_storage_authority := self.properties.get(ADLS_BLOB_STORAGE_AUTHORITY): - client_kwargs["blob_storage_authority"] = blob_storage_authority + # Mapping from PyIceberg properties to AzureFileSystem parameter names + property_mapping = { + ADLS_ACCOUNT_NAME: "account_name", + ADLS_ACCOUNT_KEY: "account_key", + ADLS_BLOB_STORAGE_AUTHORITY: "blob_storage_authority", + ADLS_DFS_STORAGE_AUTHORITY: "dfs_storage_authority", + ADLS_BLOB_STORAGE_SCHEME: "blob_storage_scheme", + ADLS_DFS_STORAGE_SCHEME: "dfs_storage_scheme", + ADLS_SAS_TOKEN: "sas_token", + } - if dfs_storage_authority := self.properties.get(ADLS_DFS_STORAGE_AUTHORITY): - client_kwargs["dfs_storage_authority"] = dfs_storage_authority + client_kwargs: Dict[str, Any] = {} - if blob_storage_scheme := self.properties.get(ADLS_BLOB_STORAGE_SCHEME): - client_kwargs["blob_storage_scheme"] = blob_storage_scheme + for prop_name, prop_value in self.properties.items(): + if prop_value is None: + continue - if dfs_storage_scheme := self.properties.get(ADLS_DFS_STORAGE_SCHEME): - client_kwargs["dfs_storage_scheme"] = dfs_storage_scheme + # Map known property names to AzureFileSystem parameter names + if prop_name in property_mapping: + param_name = property_mapping[prop_name] + client_kwargs[param_name] = prop_value - if sas_token := self.properties.get(ADLS_SAS_TOKEN): - client_kwargs["sas_token"] = sas_token + # Pass through any other adls.* properties that might be used by AzureFileSystem + elif prop_name.startswith("adls."): + param_name = prop_name.split(".", 1)[1] + client_kwargs[param_name] = prop_value return AzureFileSystem(**client_kwargs) def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: from pyarrow.fs import HadoopFileSystem - hdfs_kwargs: Dict[str, Any] = {} if netloc: return HadoopFileSystem.from_uri(f"{scheme}://{netloc}") - if host := self.properties.get(HDFS_HOST): - hdfs_kwargs["host"] = host - if port := self.properties.get(HDFS_PORT): - # port should be an integer type - hdfs_kwargs["port"] = int(port) - if user := self.properties.get(HDFS_USER): - hdfs_kwargs["user"] = user - if kerb_ticket := self.properties.get(HDFS_KERB_TICKET): - hdfs_kwargs["kerb_ticket"] = kerb_ticket + + # Mapping from PyIceberg properties to S3FileSystem parameter names + property_mapping = { + HDFS_HOST: "host", + HDFS_PORT: "port", + HDFS_USER: "user", + HDFS_KERB_TICKET: "kerb_ticket", + } + + hdfs_kwargs: Dict[str, Any] = {} + + for prop_name, prop_value in self.properties.items(): + if prop_value is None: + continue + + # Map known property names to HadoopFileSystem parameter names + if prop_name in property_mapping: + param_name = property_mapping[prop_name] + + if param_name == "port": + hdfs_kwargs[param_name] = int(prop_value) + else: + hdfs_kwargs[param_name] = prop_value + + # Pass through any other hdfs.* properties used to be used by HadoopFileSystem + elif prop_name.startswith("hdfs."): + param_name = prop_name.split(".", 1)[1] + hdfs_kwargs[param_name] = prop_value return HadoopFileSystem(**hdfs_kwargs) def _initialize_gcs_fs(self) -> FileSystem: from pyarrow.fs import GcsFileSystem + # Mapping from PyIceberg properties to GcsFileSystem parameter names + property_mapping = { + GCS_TOKEN: "access_token", + GCS_TOKEN_EXPIRES_AT_MS: "credential_token_expiration", + GCS_DEFAULT_LOCATION: "default_bucket_location", + GCS_SERVICE_HOST: "endpoint_override", + } + + # Properties that need special handling + special_properties = { + GCS_TOKEN_EXPIRES_AT_MS, + GCS_SERVICE_HOST, + } + gcs_kwargs: Dict[str, Any] = {} - if access_token := self.properties.get(GCS_TOKEN): - gcs_kwargs["access_token"] = access_token + + for prop_name, prop_value in self.properties.items(): + if prop_value is None: + continue + + # Skip properties that need special handling + if prop_name in special_properties: + continue + + # Map known property names to GcsFileSystem parameter names + if prop_name in property_mapping: + param_name = property_mapping[prop_name] + gcs_kwargs[param_name] = prop_value + + # Pass through any other gcs.* properties that might be used by GcsFileSystem + elif prop_name.startswith("gcs."): + param_name = prop_name.split(".", 1)[1] + gcs_kwargs[param_name] = prop_value + if expiration := self.properties.get(GCS_TOKEN_EXPIRES_AT_MS): gcs_kwargs["credential_token_expiration"] = millis_to_datetime(int(expiration)) - if bucket_location := self.properties.get(GCS_DEFAULT_LOCATION): - gcs_kwargs["default_bucket_location"] = bucket_location + if endpoint := self.properties.get(GCS_SERVICE_HOST): url_parts = urlparse(endpoint) gcs_kwargs["scheme"] = url_parts.scheme @@ -573,7 +710,18 @@ def _initialize_gcs_fs(self) -> FileSystem: return GcsFileSystem(**gcs_kwargs) def _initialize_local_fs(self) -> FileSystem: - return PyArrowLocalFileSystem() + local_kwargs: Dict[str, Any] = {} + + for prop_name, prop_value in self.properties.items(): + if prop_value is None: + continue + + # Pass through any other file.* properties that might be used by PyArrowLocalFileSystem + elif prop_name.startswith("file."): + param_name = prop_name.split(".", 1)[1] + local_kwargs[param_name] = prop_value + + return PyArrowLocalFileSystem(**local_kwargs) def new_input(self, location: str) -> PyArrowFile: """Get a PyArrowFile instance to read bytes from the file at the given location. diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 4f121ba3bc..259b97d98e 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -412,6 +412,69 @@ def test_pyarrow_unified_session_properties() -> None: ) +def test_pyarrow_s3_filesystem_specific_properties() -> None: + pyarrow_file_io = PyArrowFileIO( + { + "s3.endpoint": "http://localhost:9000", + "s3.access-key-id": "user", + "s3.secret-access-key": "pass", + "s3.load_frequency": 900, + } + ) + + # Test that valid PyArrow properties work without error + pyarrow_file_io.new_input("s3://bucket/path/to/file") + + # Test that invalid PyArrow properties raise TypeError + with pytest.raises(TypeError) as exc_info: + pyarrow_file_io = PyArrowFileIO( + { + "s3.endpoint": "http://localhost:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + "s3.unknown_property": "val", + } + ) + pyarrow_file_io.new_input("s3://bucket/path/to/file") + + assert "got an unexpected keyword argument 'unknown_property'" in str(exc_info.value) + + +def test_pyarrow_gcs_filesystem_specific_properties() -> None: + pyarrow_file_io = PyArrowFileIO( + { + "gcs.project_id": "test-project", + } + ) + + # Test that valid PyArrow properties work without error + pyarrow_file_io.new_input("gs://warehouse/path/to/file") + + # Test that invalid PyArrow properties raise TypeError + with pytest.raises(TypeError) as exc_info: + pyarrow_file_io = PyArrowFileIO({"gcs.project_id": "test-project", "gcs.unknown_property": "val"}) + pyarrow_file_io.new_input("gs://warehouse/path/to/file") + + assert "got an unexpected keyword argument 'unknown_property'" in str(exc_info.value) + + +@skip_if_pyarrow_too_old +def test_pyarrow_adls_filesystem_specific_properties() -> None: + pyarrow_file_io = PyArrowFileIO({"adls.account-name": "user", "adls.account-key": "pass", "adls.blob_cache_size": 1024}) + + # Test that valid PyArrow properties work without error + pyarrow_file_io.new_input("abfss://test/file") + + # Test that invalid PyArrow properties raise TypeError + with pytest.raises(TypeError) as exc_info: + pyarrow_file_io = PyArrowFileIO( + {"adls.account-name": "testaccount", "adls.account-key": "testkey", "adls.unknown_property": "val"} + ) + pyarrow_file_io.new_input("abfss://test/file") + + assert "got an unexpected keyword argument 'unknown_property'" in str(exc_info.value) + + def test_schema_to_pyarrow_schema_include_field_ids(table_schema_nested: Schema) -> None: actual = schema_to_pyarrow(table_schema_nested) expected = """foo: large_string @@ -2609,6 +2672,17 @@ def _s3_region_map(bucket: str) -> str: assert pyarrow_file_io.new_input(f"oss://{bucket_region[0]}/path/to/file")._filesystem.region == user_provided_region +def test_pyarrow_filesystem_properties() -> None: + pyarrow_file_io = PyArrowFileIO({"s3.load_frequency": 200}) + pyarrow_file_io.new_input("s3://bucket/path/to/file") + + with pytest.raises(TypeError) as exc_info: + pyarrow_file_io = PyArrowFileIO({"s3.unknown_property": "val"}) + pyarrow_file_io.new_input("s3://bucket/path/to/file") + + assert "got an unexpected keyword argument 'unknown_property'" in str(exc_info.value) + + def test_pyarrow_io_multi_fs() -> None: pyarrow_file_io = PyArrowFileIO({"s3.region": "ap-southeast-1"}) From 2940125b3eade3a4c32b2b4b21c179c3aa371831 Mon Sep 17 00:00:00 2001 From: geruh Date: Tue, 29 Jul 2025 15:59:04 -0700 Subject: [PATCH 02/21] refactor and fix failing tests --- pyiceberg/io/pyarrow.py | 210 +++++++++++++++------------------------ tests/io/test_pyarrow.py | 142 +++++++++++++++++++++----- 2 files changed, 199 insertions(+), 153 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index aa0f2a822e..a2d706dafc 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -83,23 +83,35 @@ ) from pyiceberg.expressions.visitors import visit as boolean_expression_visit from pyiceberg.io import ( + ADLS_ACCOUNT_HOST, ADLS_ACCOUNT_KEY, ADLS_ACCOUNT_NAME, ADLS_BLOB_STORAGE_AUTHORITY, ADLS_BLOB_STORAGE_SCHEME, + ADLS_CLIENT_ID, + ADLS_CLIENT_SECRET, + ADLS_CONNECTION_STRING, ADLS_DFS_STORAGE_AUTHORITY, ADLS_DFS_STORAGE_SCHEME, ADLS_SAS_TOKEN, + ADLS_TENANT_ID, AWS_ACCESS_KEY_ID, AWS_REGION, AWS_ROLE_ARN, AWS_ROLE_SESSION_NAME, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, + GCS_ACCESS, + GCS_CACHE_TIMEOUT, + GCS_CONSISTENCY, GCS_DEFAULT_LOCATION, + GCS_PROJECT_ID, + GCS_REQUESTER_PAYS, GCS_SERVICE_HOST, + GCS_SESSION_KWARGS, GCS_TOKEN, GCS_TOKEN_EXPIRES_AT_MS, + GCS_VERSION_AWARE, HDFS_HOST, HDFS_KERB_TICKET, HDFS_PORT, @@ -118,6 +130,8 @@ S3_ROLE_SESSION_NAME, S3_SECRET_ACCESS_KEY, S3_SESSION_TOKEN, + S3_SIGNER_ENDPOINT, + S3_SIGNER_URI, FileIO, InputFile, InputStream, @@ -397,6 +411,32 @@ def parse_location(location: str) -> Tuple[str, str, str]: else: return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}" + def _process_basic_properties( + self, property_mapping: Dict[str, str], special_properties: Set[str], prefix: str + ) -> Dict[str, Any]: + """Process basic property mappings and prefix passthrough logic.""" + client_kwargs: Dict[str, Any] = {} + + for prop_name, prop_value in self.properties.items(): + if prop_value is None: + continue + + # Skip properties that need special handling + if prop_name in special_properties: + continue + + # Map known property names to filesystem parameter names + if prop_name in property_mapping: + param_name = property_mapping[prop_name] + client_kwargs[param_name] = prop_value + + # Pass through any other {prefix}.* properties + elif prop_name.startswith(f"{prefix}."): + param_name = prop_name.split(".", 1)[1] + client_kwargs[param_name] = prop_value + + return client_kwargs + def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem: """Initialize FileSystem for different scheme.""" if scheme in {"oss"}: @@ -426,51 +466,26 @@ def _initialize_oss_fs(self) -> FileSystem: # Mapping from PyIceberg properties to S3FileSystem parameter names property_mapping = { S3_ENDPOINT: "endpoint_override", - S3_ACCESS_KEY_ID: "access_key", - AWS_ACCESS_KEY_ID: "access_key", - S3_SECRET_ACCESS_KEY: "secret_key", - AWS_SECRET_ACCESS_KEY: "secret_key", - S3_SESSION_TOKEN: "session_token", - AWS_SESSION_TOKEN: "session_token", - S3_REGION: "region", - AWS_REGION: "region", S3_PROXY_URI: "proxy_options", S3_CONNECT_TIMEOUT: "connect_timeout", S3_REQUEST_TIMEOUT: "request_timeout", - S3_FORCE_VIRTUAL_ADDRESSING: "force_virtual_addressing", } # Properties that need special handling special_properties = { + S3_ACCESS_KEY_ID, + S3_SECRET_ACCESS_KEY, + S3_SESSION_TOKEN, S3_CONNECT_TIMEOUT, S3_REQUEST_TIMEOUT, S3_FORCE_VIRTUAL_ADDRESSING, S3_ROLE_SESSION_NAME, S3_RESOLVE_REGION, - AWS_ROLE_SESSION_NAME, + S3_REGION, } - client_kwargs: Dict[str, Any] = {} + client_kwargs = self._process_basic_properties(property_mapping, special_properties, "s3") - for prop_name, prop_value in self.properties.items(): - if prop_value is None: - continue - - # Skip properties that need special handling - if prop_name in special_properties: - continue - - # Map known property names to S3FileSystem parameter names - if prop_name in property_mapping: - param_name = property_mapping[prop_name] - client_kwargs[param_name] = prop_value - - # Pass through any other s3.* properties to S3FileSystem - elif prop_name.startswith("s3."): - param_name = prop_name.split(".", 1)[1] - client_kwargs[param_name] = prop_value - - # Handle properties that need first value resolution if S3_ACCESS_KEY_ID in self.properties or AWS_ACCESS_KEY_ID in self.properties: client_kwargs["access_key"] = get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID) @@ -505,57 +520,35 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem: else: bucket_region = provided_region - # Properties that need special handling + # Mapping from PyIceberg properties to S3FileSystem parameter names property_mapping = { S3_ENDPOINT: "endpoint_override", - S3_ACCESS_KEY_ID: "access_key", - AWS_ACCESS_KEY_ID: "access_key", - S3_SECRET_ACCESS_KEY: "secret_key", - AWS_SECRET_ACCESS_KEY: "secret_key", - S3_SESSION_TOKEN: "session_token", - AWS_SESSION_TOKEN: "session_token", S3_PROXY_URI: "proxy_options", S3_CONNECT_TIMEOUT: "connect_timeout", S3_REQUEST_TIMEOUT: "request_timeout", - S3_ROLE_ARN: "role_arn", - AWS_ROLE_ARN: "role_arn", - S3_ROLE_SESSION_NAME: "session_name", - AWS_ROLE_SESSION_NAME: "session_name", - S3_FORCE_VIRTUAL_ADDRESSING: "force_virtual_addressing", S3_RETRY_STRATEGY_IMPL: "retry_strategy", } - # Properties that need special handling and should not be passed directly + # Properties that need special handling special_properties = { + S3_ACCESS_KEY_ID, + S3_SECRET_ACCESS_KEY, + S3_SESSION_TOKEN, + S3_ROLE_ARN, + S3_ROLE_SESSION_NAME, S3_RESOLVE_REGION, S3_REGION, - AWS_REGION, S3_RETRY_STRATEGY_IMPL, S3_CONNECT_TIMEOUT, S3_REQUEST_TIMEOUT, + S3_SIGNER_ENDPOINT, + S3_SIGNER_URI, + S3_FORCE_VIRTUAL_ADDRESSING, } - client_kwargs: Dict[str, Any] = {} - + client_kwargs = self._process_basic_properties(property_mapping, special_properties, "s3") client_kwargs["region"] = bucket_region - for prop_name, prop_value in self.properties.items(): - if prop_value is None: - continue - - # Skip properties that need special handling - if prop_name in special_properties: - continue - - if prop_name in property_mapping: - param_name = property_mapping[prop_name] - client_kwargs[param_name] = prop_value - # Pass through any other s3.* properties that might be used by S3FileSystem - elif prop_name.startswith("s3."): - param_name = prop_name.split(".", 1)[1] - client_kwargs[param_name] = prop_value - - # Handle properties that need first value resolution if S3_ACCESS_KEY_ID in self.properties or AWS_ACCESS_KEY_ID in self.properties: client_kwargs["access_key"] = get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID) @@ -577,6 +570,9 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem: if request_timeout := self.properties.get(S3_REQUEST_TIMEOUT): client_kwargs["request_timeout"] = float(request_timeout) + if self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING) is not None: + client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, S3_FORCE_VIRTUAL_ADDRESSING, False) + # Handle retry strategy special case if (retry_strategy_impl := self.properties.get(S3_RETRY_STRATEGY_IMPL)) and ( retry_instance := _import_retry_strategy(retry_strategy_impl) @@ -606,24 +602,17 @@ def _initialize_azure_fs(self) -> FileSystem: ADLS_BLOB_STORAGE_SCHEME: "blob_storage_scheme", ADLS_DFS_STORAGE_SCHEME: "dfs_storage_scheme", ADLS_SAS_TOKEN: "sas_token", + ADLS_CLIENT_ID: "client_id", + ADLS_CLIENT_SECRET: "client_secret", + ADLS_TENANT_ID: "tenant_id", } - client_kwargs: Dict[str, Any] = {} - - for prop_name, prop_value in self.properties.items(): - if prop_value is None: - continue - - # Map known property names to AzureFileSystem parameter names - if prop_name in property_mapping: - param_name = property_mapping[prop_name] - client_kwargs[param_name] = prop_value - - # Pass through any other adls.* properties that might be used by AzureFileSystem - elif prop_name.startswith("adls."): - param_name = prop_name.split(".", 1)[1] - client_kwargs[param_name] = prop_value + special_properties = { + ADLS_CONNECTION_STRING, + ADLS_ACCOUNT_HOST, + } + client_kwargs = self._process_basic_properties(property_mapping, special_properties, "adls") return AzureFileSystem(**client_kwargs) def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: @@ -632,7 +621,7 @@ def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: if netloc: return HadoopFileSystem.from_uri(f"{scheme}://{netloc}") - # Mapping from PyIceberg properties to S3FileSystem parameter names + # Mapping from PyIceberg properties to HadoopFileSystem parameter names property_mapping = { HDFS_HOST: "host", HDFS_PORT: "port", @@ -640,25 +629,11 @@ def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: HDFS_KERB_TICKET: "kerb_ticket", } - hdfs_kwargs: Dict[str, Any] = {} + hdfs_kwargs = self._process_basic_properties(property_mapping, set(), "hdfs") - for prop_name, prop_value in self.properties.items(): - if prop_value is None: - continue - - # Map known property names to HadoopFileSystem parameter names - if prop_name in property_mapping: - param_name = property_mapping[prop_name] - - if param_name == "port": - hdfs_kwargs[param_name] = int(prop_value) - else: - hdfs_kwargs[param_name] = prop_value - - # Pass through any other hdfs.* properties used to be used by HadoopFileSystem - elif prop_name.startswith("hdfs."): - param_name = prop_name.split(".", 1)[1] - hdfs_kwargs[param_name] = prop_value + # Handle port conversion to int + if "port" in hdfs_kwargs: + hdfs_kwargs["port"] = int(hdfs_kwargs["port"]) return HadoopFileSystem(**hdfs_kwargs) @@ -668,36 +643,23 @@ def _initialize_gcs_fs(self) -> FileSystem: # Mapping from PyIceberg properties to GcsFileSystem parameter names property_mapping = { GCS_TOKEN: "access_token", - GCS_TOKEN_EXPIRES_AT_MS: "credential_token_expiration", GCS_DEFAULT_LOCATION: "default_bucket_location", - GCS_SERVICE_HOST: "endpoint_override", + GCS_PROJECT_ID: "project_id", } # Properties that need special handling special_properties = { GCS_TOKEN_EXPIRES_AT_MS, GCS_SERVICE_HOST, + GCS_ACCESS, + GCS_CONSISTENCY, + GCS_CACHE_TIMEOUT, + GCS_REQUESTER_PAYS, + GCS_SESSION_KWARGS, + GCS_VERSION_AWARE, } - gcs_kwargs: Dict[str, Any] = {} - - for prop_name, prop_value in self.properties.items(): - if prop_value is None: - continue - - # Skip properties that need special handling - if prop_name in special_properties: - continue - - # Map known property names to GcsFileSystem parameter names - if prop_name in property_mapping: - param_name = property_mapping[prop_name] - gcs_kwargs[param_name] = prop_value - - # Pass through any other gcs.* properties that might be used by GcsFileSystem - elif prop_name.startswith("gcs."): - param_name = prop_name.split(".", 1)[1] - gcs_kwargs[param_name] = prop_value + gcs_kwargs = self._process_basic_properties(property_mapping, special_properties, "gcs") if expiration := self.properties.get(GCS_TOKEN_EXPIRES_AT_MS): gcs_kwargs["credential_token_expiration"] = millis_to_datetime(int(expiration)) @@ -710,17 +672,7 @@ def _initialize_gcs_fs(self) -> FileSystem: return GcsFileSystem(**gcs_kwargs) def _initialize_local_fs(self) -> FileSystem: - local_kwargs: Dict[str, Any] = {} - - for prop_name, prop_value in self.properties.items(): - if prop_value is None: - continue - - # Pass through any other file.* properties that might be used by PyArrowLocalFileSystem - elif prop_name.startswith("file."): - param_name = prop_name.split(".", 1)[1] - local_kwargs[param_name] = prop_value - + local_kwargs = self._process_basic_properties({}, set(), "file") return PyArrowLocalFileSystem(**local_kwargs) def new_input(self, location: str) -> PyArrowFile: diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 259b97d98e..236e5b3b16 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -412,18 +412,29 @@ def test_pyarrow_unified_session_properties() -> None: ) -def test_pyarrow_s3_filesystem_specific_properties() -> None: +def test_s3_pyarrow_specific_properties() -> None: pyarrow_file_io = PyArrowFileIO( { "s3.endpoint": "http://localhost:9000", "s3.access-key-id": "user", "s3.secret-access-key": "pass", "s3.load_frequency": 900, + "s3.region": "us-east-1", } ) # Test that valid PyArrow properties work without error - pyarrow_file_io.new_input("s3://bucket/path/to/file") + with patch("pyarrow.fs.S3FileSystem") as mock_s3fs: + pyarrow_file_io._initialize_s3_fs(None) + + # Verify that properties are passed through correctly + mock_s3fs.assert_called_with( + endpoint_override="http://localhost:9000", + access_key="user", + secret_key="pass", + load_frequency=900, + region="us-east-1", + ) # Test that invalid PyArrow properties raise TypeError with pytest.raises(TypeError) as exc_info: @@ -435,44 +446,108 @@ def test_pyarrow_s3_filesystem_specific_properties() -> None: "s3.unknown_property": "val", } ) - pyarrow_file_io.new_input("s3://bucket/path/to/file") + pyarrow_file_io._initialize_s3_fs(None) assert "got an unexpected keyword argument 'unknown_property'" in str(exc_info.value) -def test_pyarrow_gcs_filesystem_specific_properties() -> None: +def test_iceberg_cred_properties_take_precedence() -> None: + session_properties: Properties = { + "s3.access-key-id": "explicit-access-key", + "s3.secret-access-key": "explicit-secret-key", + "s3.region": "us-east-1", + # These should be ignored because explicit properties take precedence + "s3.access_key": "passed-access-key", + "s3.secret_key": "passed-secret-key", + } + + with patch("pyarrow.fs.S3FileSystem") as mock_s3fs: + s3_fileio = PyArrowFileIO(properties=session_properties) + + s3_fileio._initialize_s3_fs(None) + + # Assert that explicit properties are used from above + mock_s3fs.assert_called_with( + access_key="explicit-access-key", + secret_key="explicit-secret-key", + region="us-east-1", + ) + + +def test_hdfs_pyarrow_specific_properties() -> None: + hdfs_properties: Properties = { + "hdfs.host": "localhost", + "hdfs.port": "9000", + "hdfs.user": "user", + "hdfs.kerberos_ticket": "test", + # non iceberg properties + "hdfs.replication": 3, + "hdfs.block_size": 134217728, + } + + with patch("pyarrow.fs.HadoopFileSystem") as mock_hdfs: + hdfs_fileio = PyArrowFileIO(properties=hdfs_properties) + hdfs_fileio._initialize_hdfs_fs("hdfs", None) + + mock_hdfs.assert_called_with( + host="localhost", + port=9000, + user="user", + kerb_ticket="test", + replication=3, + block_size=134217728, + ) + + +def test_local_filesystem_pyarrow_specific_properties() -> None: + local_properties: Properties = {"file.buffer_size": 8192, "file.use_mmap": True} + + with patch("pyiceberg.io.pyarrow.PyArrowLocalFileSystem") as mock_local: + local_fileio = PyArrowFileIO(properties=local_properties) + local_fileio._initialize_local_fs() + + mock_local.assert_called_with( + buffer_size=8192, + use_mmap=True, + ) + + +def test_gcs_pyarrow_specific_properties() -> None: pyarrow_file_io = PyArrowFileIO( { - "gcs.project_id": "test-project", + "gcs.project-id": "project", + "gcs.oauth2.token": "test", + "gcs.default-bucket-location": "loc", } ) - # Test that valid PyArrow properties work without error - pyarrow_file_io.new_input("gs://warehouse/path/to/file") + with patch("pyarrow.fs.GcsFileSystem") as mock_gcs: + pyarrow_file_io._initialize_gcs_fs() - # Test that invalid PyArrow properties raise TypeError - with pytest.raises(TypeError) as exc_info: - pyarrow_file_io = PyArrowFileIO({"gcs.project_id": "test-project", "gcs.unknown_property": "val"}) - pyarrow_file_io.new_input("gs://warehouse/path/to/file") - - assert "got an unexpected keyword argument 'unknown_property'" in str(exc_info.value) + mock_gcs.assert_called_with( + project_id="project", + access_token="test", + default_bucket_location="loc", + ) @skip_if_pyarrow_too_old -def test_pyarrow_adls_filesystem_specific_properties() -> None: - pyarrow_file_io = PyArrowFileIO({"adls.account-name": "user", "adls.account-key": "pass", "adls.blob_cache_size": 1024}) +def test_pyarrow_adls_pyarrow_specific_properties() -> None: + pyarrow_file_io = PyArrowFileIO( + {"adls.account-name": "user", "adls.account-key": "pass", "adls.sas-token": "testsas", "adls.client_id": "client"} + ) # Test that valid PyArrow properties work without error - pyarrow_file_io.new_input("abfss://test/file") - - # Test that invalid PyArrow properties raise TypeError - with pytest.raises(TypeError) as exc_info: - pyarrow_file_io = PyArrowFileIO( - {"adls.account-name": "testaccount", "adls.account-key": "testkey", "adls.unknown_property": "val"} + with patch("pyarrow.fs.AzureFileSystem") as mock_azure: + pyarrow_file_io._initialize_azure_fs() + + # Verify that properties are passed through correctly + mock_azure.assert_called_with( + account_name="user", + account_key="pass", + sas_token="testsas", + client_id="client", ) - pyarrow_file_io.new_input("abfss://test/file") - - assert "got an unexpected keyword argument 'unknown_property'" in str(exc_info.value) def test_schema_to_pyarrow_schema_include_field_ids(table_schema_nested: Schema) -> None: @@ -2712,3 +2787,22 @@ def test_retry_strategy_not_found() -> None: io = PyArrowFileIO(properties={S3_RETRY_STRATEGY_IMPL: "pyiceberg.DoesNotExist"}) with pytest.warns(UserWarning, match="Could not initialize S3 retry strategy: pyiceberg.DoesNotExist"): io.new_input("s3://bucket/path/to/file") + + +def test_hdfs_filesystem_properties_with_netloc() -> None: + """Test that HDFS filesystem uses from_uri when netloc is provided.""" + hdfs_properties: Properties = { + "hdfs.host": "localhost", + "hdfs.port": "9000", + "hdfs.user": "testuser", + } + + with patch("pyarrow.fs.HadoopFileSystem") as mock_hdfs: + hdfs_fileio = PyArrowFileIO(properties=hdfs_properties) + filename = str(uuid.uuid4()) + + # When netloc is provided, it should use from_uri instead of properties + hdfs_fileio.new_input(location=f"hdfs://testcluster:8020/{filename}") + + # Verify that from_uri is called instead of constructor with properties + mock_hdfs.from_uri.assert_called_with("hdfs://testcluster:8020") From 5bd8cccdd67a4c8a76765b8b716a7f484be59279 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sat, 2 Aug 2025 16:35:56 -0700 Subject: [PATCH 03/21] add properties util functions --- pyiceberg/utils/properties.py | 53 +++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/pyiceberg/utils/properties.py b/pyiceberg/utils/properties.py index 2b228f6e41..a4ec0a70b7 100644 --- a/pyiceberg/utils/properties.py +++ b/pyiceberg/utils/properties.py @@ -17,6 +17,7 @@ from typing import ( Any, + Callable, Dict, Optional, ) @@ -83,3 +84,55 @@ def get_header_properties( ) -> Properties: header_prefix_len = len(HEADER_PREFIX) return {key[header_prefix_len:]: value for key, value in properties.items() if key.startswith(HEADER_PREFIX)} + + +def properties_with_prefix( + properties: Properties, + prefix: str, +) -> Properties: + """ + Returns subset of provided map with keys matching the provided prefix. Matching is + case-sensitive and the matching prefix is removed from the keys in returned map. + + Args: + properties: input map + prefix: prefix to choose keys from input map + + Returns: + subset of input map with keys starting with provided prefix and prefix trimmed out + """ + if not properties: + return {} + + return { + key[len(prefix):]: value + for key, value in properties.items() + if key.startswith(prefix) + } + + +def filter_properties( + properties: Properties, + key_predicate: Callable[[str], bool], +) -> Properties: + """ + Filter the properties map by the provided key predicate. + + Args: + properties: input map + key_predicate: predicate to choose keys from input map + + Returns: + subset of input map with keys satisfying the predicate + """ + if not properties: + return {} + + if key_predicate is None: + raise ValueError("Invalid key predicate: None") + + return { + key: value + for key, value in properties.items() + if key_predicate(key) + } From 9d44d8045e5b9b009256ee3b373cd54ecab9267c Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sat, 2 Aug 2025 16:36:07 -0700 Subject: [PATCH 04/21] refactor for copilot --- pyiceberg/io/pyarrow.py | 46 ++++++++++++++++++++--------------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index a2d706dafc..6bb0214857 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -411,6 +411,29 @@ def parse_location(location: str) -> Tuple[str, str, str]: else: return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}" + def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem: + """Initialize FileSystem for different scheme.""" + if scheme in {"oss"}: + return self._initialize_oss_fs() + + elif scheme in {"s3", "s3a", "s3n"}: + return self._initialize_s3_fs(netloc) + + elif scheme in {"hdfs", "viewfs"}: + return self._initialize_hdfs_fs(scheme, netloc) + + elif scheme in {"gs", "gcs"}: + return self._initialize_gcs_fs() + + elif scheme in {"abfs", "abfss", "wasb", "wasbs"}: + return self._initialize_azure_fs() + + elif scheme in {"file"}: + return self._initialize_local_fs() + + else: + raise ValueError(f"Unrecognized filesystem type in URI: {scheme}") + def _process_basic_properties( self, property_mapping: Dict[str, str], special_properties: Set[str], prefix: str ) -> Dict[str, Any]: @@ -437,29 +460,6 @@ def _process_basic_properties( return client_kwargs - def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem: - """Initialize FileSystem for different scheme.""" - if scheme in {"oss"}: - return self._initialize_oss_fs() - - elif scheme in {"s3", "s3a", "s3n"}: - return self._initialize_s3_fs(netloc) - - elif scheme in {"hdfs", "viewfs"}: - return self._initialize_hdfs_fs(scheme, netloc) - - elif scheme in {"gs", "gcs"}: - return self._initialize_gcs_fs() - - elif scheme in {"abfs", "abfss", "wasb", "wasbs"}: - return self._initialize_azure_fs() - - elif scheme in {"file"}: - return self._initialize_local_fs() - - else: - raise ValueError(f"Unrecognized filesystem type in URI: {scheme}") - def _initialize_oss_fs(self) -> FileSystem: from pyarrow.fs import S3FileSystem From f96537026124e041a2b705c18e8d79742dbf6d50 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sat, 2 Aug 2025 20:36:43 -0700 Subject: [PATCH 05/21] refactor oss --- pyiceberg/io/pyarrow.py | 73 +++++++++++++++++------------------ pyiceberg/utils/properties.py | 15 ++----- 2 files changed, 39 insertions(+), 49 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 6bb0214857..7cb11402fa 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -190,13 +190,14 @@ TimeType, UnknownType, UUIDType, + strtobool, ) from pyiceberg.utils.concurrent import ExecutorFactory from pyiceberg.utils.config import Config from pyiceberg.utils.datetime import millis_to_datetime from pyiceberg.utils.decimal import unscaled_to_decimal from pyiceberg.utils.deprecated import deprecation_message -from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int +from pyiceberg.utils.properties import get_first_property_value, properties_with_prefix, property_as_bool, property_as_int from pyiceberg.utils.singleton import Singleton from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string @@ -463,43 +464,41 @@ def _process_basic_properties( def _initialize_oss_fs(self) -> FileSystem: from pyarrow.fs import S3FileSystem - # Mapping from PyIceberg properties to S3FileSystem parameter names - property_mapping = { - S3_ENDPOINT: "endpoint_override", - S3_PROXY_URI: "proxy_options", - S3_CONNECT_TIMEOUT: "connect_timeout", - S3_REQUEST_TIMEOUT: "request_timeout", - } - - # Properties that need special handling - special_properties = { - S3_ACCESS_KEY_ID, - S3_SECRET_ACCESS_KEY, - S3_SESSION_TOKEN, - S3_CONNECT_TIMEOUT, - S3_REQUEST_TIMEOUT, - S3_FORCE_VIRTUAL_ADDRESSING, - S3_ROLE_SESSION_NAME, - S3_RESOLVE_REGION, - S3_REGION, - } - - client_kwargs = self._process_basic_properties(property_mapping, special_properties, "s3") - - if S3_ACCESS_KEY_ID in self.properties or AWS_ACCESS_KEY_ID in self.properties: - client_kwargs["access_key"] = get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID) - - if S3_SECRET_ACCESS_KEY in self.properties or AWS_SECRET_ACCESS_KEY in self.properties: - client_kwargs["secret_key"] = get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY) - - if S3_SESSION_TOKEN in self.properties or AWS_SESSION_TOKEN in self.properties: - client_kwargs["session_token"] = get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN) - - if S3_REGION in self.properties or AWS_REGION in self.properties: - client_kwargs["region"] = get_first_property_value(self.properties, S3_REGION, AWS_REGION) - - client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, S3_FORCE_VIRTUAL_ADDRESSING, True) + client_kwargs = {} + if endpoint := get_first_property_value(self.properties, S3_ENDPOINT, "oss.endpoint_override"): + client_kwargs["endpoint_override"] = endpoint + if access_key := get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID, "oss.access_key"): + client_kwargs["access_key"] = access_key + if secret_key := get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY, "oss.secret_key"): + client_kwargs["secret_key"] = secret_key + if session_token := get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN, "oss.session_token"): + client_kwargs["session_token"] = session_token + if region := get_first_property_value(self.properties, S3_REGION, AWS_REGION, "oss.region"): + client_kwargs["region"] = region + # Check for force_virtual_addressing in order of preference, defaulting to True if not found + if force_virtual_addressing := get_first_property_value( + self.properties, S3_FORCE_VIRTUAL_ADDRESSING, "oss.force_virtual_addressing" + ): + if isinstance(force_virtual_addressing, str): # S3_FORCE_VIRTUAL_ADDRESSING's value can be a string + force_virtual_addressing = strtobool(force_virtual_addressing) + client_kwargs["force_virtual_addressing"] = force_virtual_addressing + else: + client_kwargs["force_virtual_addressing"] = True + if proxy_uri := get_first_property_value(self.properties, S3_PROXY_URI, "oss.proxy_options"): + client_kwargs["proxy_options"] = proxy_uri + if connect_timeout := get_first_property_value(self.properties, S3_CONNECT_TIMEOUT, "oss.connect_timeout"): + client_kwargs["connect_timeout"] = float(connect_timeout) + if request_timeout := get_first_property_value(self.properties, S3_REQUEST_TIMEOUT, "oss.request_timeout"): + client_kwargs["request_timeout"] = float(request_timeout) + if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN, "oss.role_arn"): + client_kwargs["role_arn"] = role_arn + if session_name := get_first_property_value( + self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME, "oss.session_name" + ): + client_kwargs["session_name"] = session_name + oss_properties = properties_with_prefix(self.properties, prefix="oss.") + client_kwargs = {**oss_properties, **client_kwargs} return S3FileSystem(**client_kwargs) def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem: diff --git a/pyiceberg/utils/properties.py b/pyiceberg/utils/properties.py index a4ec0a70b7..8d8d196dc5 100644 --- a/pyiceberg/utils/properties.py +++ b/pyiceberg/utils/properties.py @@ -91,8 +91,7 @@ def properties_with_prefix( prefix: str, ) -> Properties: """ - Returns subset of provided map with keys matching the provided prefix. Matching is - case-sensitive and the matching prefix is removed from the keys in returned map. + Return subset of provided map with keys matching the provided prefix. Matching is case-sensitive and the matching prefix is removed from the keys in returned map. Args: properties: input map @@ -104,11 +103,7 @@ def properties_with_prefix( if not properties: return {} - return { - key[len(prefix):]: value - for key, value in properties.items() - if key.startswith(prefix) - } + return {key[len(prefix) :]: value for key, value in properties.items() if key.startswith(prefix)} def filter_properties( @@ -131,8 +126,4 @@ def filter_properties( if key_predicate is None: raise ValueError("Invalid key predicate: None") - return { - key: value - for key, value in properties.items() - if key_predicate(key) - } + return {key: value for key, value in properties.items() if key_predicate(key)} From e39e2c8f9090d831504f172fcf1e4dba4e792d64 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 3 Aug 2025 10:17:28 -0700 Subject: [PATCH 06/21] refactor --- pyiceberg/io/pyarrow.py | 143 +++++++++++++++++++--------------------- 1 file changed, 69 insertions(+), 74 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 7cb11402fa..1eec57606c 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -130,8 +130,6 @@ S3_ROLE_SESSION_NAME, S3_SECRET_ACCESS_KEY, S3_SESSION_TOKEN, - S3_SIGNER_ENDPOINT, - S3_SIGNER_URI, FileIO, InputFile, InputStream, @@ -197,7 +195,11 @@ from pyiceberg.utils.datetime import millis_to_datetime from pyiceberg.utils.decimal import unscaled_to_decimal from pyiceberg.utils.deprecated import deprecation_message -from pyiceberg.utils.properties import get_first_property_value, properties_with_prefix, property_as_bool, property_as_int +from pyiceberg.utils.properties import ( + filter_properties, + property_as_bool, + property_as_int, +) from pyiceberg.utils.singleton import Singleton from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string @@ -461,51 +463,76 @@ def _process_basic_properties( return client_kwargs + def _get_first_property_value_with_tracking(self, props: Properties, used_keys: set[str], *keys: str) -> Optional[Any]: + """Tracks all candidate keys and returns the first value found.""" + used_keys.update(keys) + for key in keys: + if key in props: + return props[key] + return None + def _initialize_oss_fs(self) -> FileSystem: from pyarrow.fs import S3FileSystem + properties = filter_properties(self.properties, key_predicate=lambda k: k.startswith(("oss.", "client."))) + used_keys: set[str] = set() client_kwargs = {} - if endpoint := get_first_property_value(self.properties, S3_ENDPOINT, "oss.endpoint_override"): + + get = lambda *keys: self._get_first_property_value_with_tracking(properties, used_keys, *keys) # noqa: E731 + + if endpoint := get(S3_ENDPOINT, "oss.endpoint_override"): client_kwargs["endpoint_override"] = endpoint - if access_key := get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID, "oss.access_key"): + if access_key := get(S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID, "oss.access_key"): client_kwargs["access_key"] = access_key - if secret_key := get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY, "oss.secret_key"): + if secret_key := get(S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY, "oss.secret_key"): client_kwargs["secret_key"] = secret_key - if session_token := get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN, "oss.session_token"): + if session_token := get(S3_SESSION_TOKEN, AWS_SESSION_TOKEN, "oss.session_token"): client_kwargs["session_token"] = session_token - if region := get_first_property_value(self.properties, S3_REGION, AWS_REGION, "oss.region"): + if region := get(S3_REGION, AWS_REGION, "oss.region"): client_kwargs["region"] = region - # Check for force_virtual_addressing in order of preference, defaulting to True if not found - if force_virtual_addressing := get_first_property_value( - self.properties, S3_FORCE_VIRTUAL_ADDRESSING, "oss.force_virtual_addressing" - ): + # Check for force_virtual_addressing in order of preference. For oss FS, defaulting to True if not found + if force_virtual_addressing := get(S3_FORCE_VIRTUAL_ADDRESSING, "oss.force_virtual_addressing"): if isinstance(force_virtual_addressing, str): # S3_FORCE_VIRTUAL_ADDRESSING's value can be a string force_virtual_addressing = strtobool(force_virtual_addressing) client_kwargs["force_virtual_addressing"] = force_virtual_addressing else: client_kwargs["force_virtual_addressing"] = True - if proxy_uri := get_first_property_value(self.properties, S3_PROXY_URI, "oss.proxy_options"): + if proxy_uri := get(S3_PROXY_URI, "oss.proxy_options"): client_kwargs["proxy_options"] = proxy_uri - if connect_timeout := get_first_property_value(self.properties, S3_CONNECT_TIMEOUT, "oss.connect_timeout"): + if connect_timeout := get(S3_CONNECT_TIMEOUT, "oss.connect_timeout"): client_kwargs["connect_timeout"] = float(connect_timeout) - if request_timeout := get_first_property_value(self.properties, S3_REQUEST_TIMEOUT, "oss.request_timeout"): + if request_timeout := get(S3_REQUEST_TIMEOUT, "oss.request_timeout"): client_kwargs["request_timeout"] = float(request_timeout) - if role_arn := get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN, "oss.role_arn"): + if role_arn := get(S3_ROLE_ARN, AWS_ROLE_ARN, "oss.role_arn"): client_kwargs["role_arn"] = role_arn - if session_name := get_first_property_value( - self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME, "oss.session_name" - ): + if session_name := get(S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME, "oss.session_name"): client_kwargs["session_name"] = session_name - oss_properties = properties_with_prefix(self.properties, prefix="oss.") - client_kwargs = {**oss_properties, **client_kwargs} + remaining_oss_props = { + k.removeprefix("oss."): v for k, v in self.properties.items() if k.startswith("oss.") and k not in used_keys + } + client_kwargs = {**remaining_oss_props, **client_kwargs} return S3FileSystem(**client_kwargs) def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem: from pyarrow.fs import S3FileSystem - provided_region = get_first_property_value(self.properties, S3_REGION, AWS_REGION) + properties = filter_properties(self.properties, key_predicate=lambda k: k.startswith(("s3.", "client."))) + used_keys: set[str] = set() + client_kwargs = {} + + get = lambda *keys: self._get_first_property_value_with_tracking(properties, used_keys, *keys) # noqa: E731 + if endpoint := get(S3_ENDPOINT, "s3.endpoint_override"): + client_kwargs["endpoint_override"] = endpoint + if access_key := get(S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID, "s3.access_key"): + client_kwargs["access_key"] = access_key + if secret_key := get(S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY, "s3.secret_key"): + client_kwargs["secret_key"] = secret_key + if session_token := get(S3_SESSION_TOKEN, AWS_SESSION_TOKEN, "s3.session_token"): + client_kwargs["session_token"] = session_token + + provided_region = get(S3_REGION, AWS_REGION) # Do this when we don't provide the region at all, or when we explicitly enable it if provided_region is None or property_as_bool(self.properties, S3_RESOLVE_REGION, False) is True: # Resolve region from netloc(bucket), fallback to user-provided region @@ -518,66 +545,34 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem: ) else: bucket_region = provided_region - - # Mapping from PyIceberg properties to S3FileSystem parameter names - property_mapping = { - S3_ENDPOINT: "endpoint_override", - S3_PROXY_URI: "proxy_options", - S3_CONNECT_TIMEOUT: "connect_timeout", - S3_REQUEST_TIMEOUT: "request_timeout", - S3_RETRY_STRATEGY_IMPL: "retry_strategy", - } - - # Properties that need special handling - special_properties = { - S3_ACCESS_KEY_ID, - S3_SECRET_ACCESS_KEY, - S3_SESSION_TOKEN, - S3_ROLE_ARN, - S3_ROLE_SESSION_NAME, - S3_RESOLVE_REGION, - S3_REGION, - S3_RETRY_STRATEGY_IMPL, - S3_CONNECT_TIMEOUT, - S3_REQUEST_TIMEOUT, - S3_SIGNER_ENDPOINT, - S3_SIGNER_URI, - S3_FORCE_VIRTUAL_ADDRESSING, - } - - client_kwargs = self._process_basic_properties(property_mapping, special_properties, "s3") client_kwargs["region"] = bucket_region + used_keys.add(S3_RESOLVE_REGION) - if S3_ACCESS_KEY_ID in self.properties or AWS_ACCESS_KEY_ID in self.properties: - client_kwargs["access_key"] = get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID) - - if S3_SECRET_ACCESS_KEY in self.properties or AWS_SECRET_ACCESS_KEY in self.properties: - client_kwargs["secret_key"] = get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY) - - if S3_SESSION_TOKEN in self.properties or AWS_SESSION_TOKEN in self.properties: - client_kwargs["session_token"] = get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN) - - if S3_ROLE_ARN in self.properties or AWS_ROLE_ARN in self.properties: - client_kwargs["role_arn"] = get_first_property_value(self.properties, S3_ROLE_ARN, AWS_ROLE_ARN) - - if S3_ROLE_SESSION_NAME in self.properties or AWS_ROLE_SESSION_NAME in self.properties: - client_kwargs["session_name"] = get_first_property_value(self.properties, S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME) + if force_virtual_addressing := get(S3_FORCE_VIRTUAL_ADDRESSING, "s3.force_virtual_addressing"): + if isinstance(force_virtual_addressing, str): # S3_FORCE_VIRTUAL_ADDRESSING's value can be a string + force_virtual_addressing = strtobool(force_virtual_addressing) + client_kwargs["force_virtual_addressing"] = force_virtual_addressing - if connect_timeout := self.properties.get(S3_CONNECT_TIMEOUT): + if proxy_uri := get(S3_PROXY_URI, "s3.proxy_options"): + client_kwargs["proxy_options"] = proxy_uri + if connect_timeout := get(S3_CONNECT_TIMEOUT, "s3.connect_timeout"): client_kwargs["connect_timeout"] = float(connect_timeout) - - if request_timeout := self.properties.get(S3_REQUEST_TIMEOUT): + if request_timeout := get(S3_REQUEST_TIMEOUT, "s3.request_timeout"): client_kwargs["request_timeout"] = float(request_timeout) - - if self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING) is not None: - client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, S3_FORCE_VIRTUAL_ADDRESSING, False) + if role_arn := get(S3_ROLE_ARN, AWS_ROLE_ARN, "s3.role_arn"): + client_kwargs["role_arn"] = role_arn + if session_name := get(S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME, "s3.session_name"): + client_kwargs["session_name"] = session_name # Handle retry strategy special case - if (retry_strategy_impl := self.properties.get(S3_RETRY_STRATEGY_IMPL)) and ( - retry_instance := _import_retry_strategy(retry_strategy_impl) - ): - client_kwargs["retry_strategy"] = retry_instance + if retry_strategy_impl := get(S3_RETRY_STRATEGY_IMPL, "s3.retry_strategy"): + if retry_instance := _import_retry_strategy(retry_strategy_impl): + client_kwargs["retry_strategy"] = retry_instance + remaining_s3_props = { + k.removeprefix("s3."): v for k, v in self.properties.items() if k.startswith("s3.") and k not in used_keys + } + client_kwargs = {**remaining_s3_props, **client_kwargs} return S3FileSystem(**client_kwargs) def _initialize_azure_fs(self) -> FileSystem: From 1bad384dcac2aedcf895930ae17585b365ca31b3 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 3 Aug 2025 12:37:30 -0700 Subject: [PATCH 07/21] refactor resolve region --- pyiceberg/io/pyarrow.py | 71 ++++++++++++++++++++++++++++------------- 1 file changed, 48 insertions(+), 23 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 1eec57606c..1b9d043233 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -471,10 +471,50 @@ def _get_first_property_value_with_tracking(self, props: Properties, used_keys: return props[key] return None + def _convert_str_to_bool(self, value: Any) -> bool: + """Convert string or other value to boolean, handling string representations properly.""" + if isinstance(value, str): + return strtobool(value) + return bool(value) + + def _resolve_s3_region( + self, provided_region: Optional[str], resolve_region_override: Any, bucket: Optional[str] + ) -> Optional[str]: + """ + Resolve S3 region based on configuration and optional bucket-based resolution. + + Args: + provided_region: Region explicitly provided in configuration + resolve_region_override: Whether to resolve region from bucket (can be string or bool) + bucket: Bucket name for region resolution + + Returns: + The resolved region string, or None if no region could be determined + """ + # Handle resolve_region_override conversion + should_resolve_region = False + if resolve_region_override is not None: + should_resolve_region = self._convert_str_to_bool(resolve_region_override) + + # If no region provided or explicit resolve requested, try to resolve from bucket + if provided_region is None or should_resolve_region: + resolved_region = _cached_resolve_s3_region(bucket=bucket) + + # Warn if resolved region differs from provided region + if provided_region is not None and resolved_region and resolved_region != provided_region: + logger.warning( + f"PyArrow FileIO overriding S3 bucket region for bucket {bucket}: " + f"provided region {provided_region}, actual region {resolved_region}" + ) + + return resolved_region or provided_region + + return provided_region + def _initialize_oss_fs(self) -> FileSystem: from pyarrow.fs import S3FileSystem - properties = filter_properties(self.properties, key_predicate=lambda k: k.startswith(("oss.", "client."))) + properties = filter_properties(self.properties, key_predicate=lambda k: k.startswith(("s3.", "client.", "oss."))) used_keys: set[str] = set() client_kwargs = {} @@ -490,12 +530,10 @@ def _initialize_oss_fs(self) -> FileSystem: client_kwargs["session_token"] = session_token if region := get(S3_REGION, AWS_REGION, "oss.region"): client_kwargs["region"] = region - # Check for force_virtual_addressing in order of preference. For oss FS, defaulting to True if not found if force_virtual_addressing := get(S3_FORCE_VIRTUAL_ADDRESSING, "oss.force_virtual_addressing"): - if isinstance(force_virtual_addressing, str): # S3_FORCE_VIRTUAL_ADDRESSING's value can be a string - force_virtual_addressing = strtobool(force_virtual_addressing) - client_kwargs["force_virtual_addressing"] = force_virtual_addressing + client_kwargs["force_virtual_addressing"] = self._convert_str_to_bool(force_virtual_addressing) else: + # For OSS FS, default to True client_kwargs["force_virtual_addressing"] = True if proxy_uri := get(S3_PROXY_URI, "oss.proxy_options"): client_kwargs["proxy_options"] = proxy_uri @@ -532,26 +570,13 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem: if session_token := get(S3_SESSION_TOKEN, AWS_SESSION_TOKEN, "s3.session_token"): client_kwargs["session_token"] = session_token - provided_region = get(S3_REGION, AWS_REGION) - # Do this when we don't provide the region at all, or when we explicitly enable it - if provided_region is None or property_as_bool(self.properties, S3_RESOLVE_REGION, False) is True: - # Resolve region from netloc(bucket), fallback to user-provided region - # Only supported by buckets hosted by S3 - bucket_region = _cached_resolve_s3_region(bucket=netloc) or provided_region - if provided_region is not None and bucket_region != provided_region: - logger.warning( - f"PyArrow FileIO overriding S3 bucket region for bucket {netloc}: " - f"provided region {provided_region}, actual region {bucket_region}" - ) - else: - bucket_region = provided_region - client_kwargs["region"] = bucket_region - used_keys.add(S3_RESOLVE_REGION) + # Handle S3 region configuration with optional auto-resolution + client_kwargs["region"] = self._resolve_s3_region( + provided_region=get(S3_REGION, AWS_REGION), resolve_region_override=get(S3_RESOLVE_REGION), bucket=netloc + ) if force_virtual_addressing := get(S3_FORCE_VIRTUAL_ADDRESSING, "s3.force_virtual_addressing"): - if isinstance(force_virtual_addressing, str): # S3_FORCE_VIRTUAL_ADDRESSING's value can be a string - force_virtual_addressing = strtobool(force_virtual_addressing) - client_kwargs["force_virtual_addressing"] = force_virtual_addressing + client_kwargs["force_virtual_addressing"] = self._convert_str_to_bool(force_virtual_addressing) if proxy_uri := get(S3_PROXY_URI, "s3.proxy_options"): client_kwargs["proxy_options"] = proxy_uri From 955a2ee4f79d54737afe39f4742a23a5ea356ae3 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 3 Aug 2025 12:53:27 -0700 Subject: [PATCH 08/21] refactor all the fs --- pyiceberg/io/pyarrow.py | 154 ++++++++++++++++++++-------------------- 1 file changed, 77 insertions(+), 77 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 1b9d043233..dcf09f20ea 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -83,35 +83,24 @@ ) from pyiceberg.expressions.visitors import visit as boolean_expression_visit from pyiceberg.io import ( - ADLS_ACCOUNT_HOST, ADLS_ACCOUNT_KEY, ADLS_ACCOUNT_NAME, ADLS_BLOB_STORAGE_AUTHORITY, ADLS_BLOB_STORAGE_SCHEME, - ADLS_CLIENT_ID, - ADLS_CLIENT_SECRET, - ADLS_CONNECTION_STRING, ADLS_DFS_STORAGE_AUTHORITY, ADLS_DFS_STORAGE_SCHEME, ADLS_SAS_TOKEN, - ADLS_TENANT_ID, AWS_ACCESS_KEY_ID, AWS_REGION, AWS_ROLE_ARN, AWS_ROLE_SESSION_NAME, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, - GCS_ACCESS, - GCS_CACHE_TIMEOUT, - GCS_CONSISTENCY, GCS_DEFAULT_LOCATION, GCS_PROJECT_ID, - GCS_REQUESTER_PAYS, GCS_SERVICE_HOST, - GCS_SESSION_KWARGS, GCS_TOKEN, GCS_TOKEN_EXPIRES_AT_MS, - GCS_VERSION_AWARE, HDFS_HOST, HDFS_KERB_TICKET, HDFS_PORT, @@ -516,9 +505,8 @@ def _initialize_oss_fs(self) -> FileSystem: properties = filter_properties(self.properties, key_predicate=lambda k: k.startswith(("s3.", "client.", "oss."))) used_keys: set[str] = set() - client_kwargs = {} - get = lambda *keys: self._get_first_property_value_with_tracking(properties, used_keys, *keys) # noqa: E731 + client_kwargs = {} if endpoint := get(S3_ENDPOINT, "oss.endpoint_override"): client_kwargs["endpoint_override"] = endpoint @@ -557,9 +545,8 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem: properties = filter_properties(self.properties, key_predicate=lambda k: k.startswith(("s3.", "client."))) used_keys: set[str] = set() - client_kwargs = {} - get = lambda *keys: self._get_first_property_value_with_tracking(properties, used_keys, *keys) # noqa: E731 + client_kwargs = {} if endpoint := get(S3_ENDPOINT, "s3.endpoint_override"): client_kwargs["endpoint_override"] = endpoint @@ -612,26 +599,36 @@ def _initialize_azure_fs(self) -> FileSystem: from pyarrow.fs import AzureFileSystem - # Mapping from PyIceberg properties to AzureFileSystem parameter names - property_mapping = { - ADLS_ACCOUNT_NAME: "account_name", - ADLS_ACCOUNT_KEY: "account_key", - ADLS_BLOB_STORAGE_AUTHORITY: "blob_storage_authority", - ADLS_DFS_STORAGE_AUTHORITY: "dfs_storage_authority", - ADLS_BLOB_STORAGE_SCHEME: "blob_storage_scheme", - ADLS_DFS_STORAGE_SCHEME: "dfs_storage_scheme", - ADLS_SAS_TOKEN: "sas_token", - ADLS_CLIENT_ID: "client_id", - ADLS_CLIENT_SECRET: "client_secret", - ADLS_TENANT_ID: "tenant_id", - } + properties = filter_properties(self.properties, key_predicate=lambda k: k.startswith("adls.")) + used_keys: set[str] = set() + get = lambda *keys: self._get_first_property_value_with_tracking(properties, used_keys, *keys) # noqa: E731 + client_kwargs = {} - special_properties = { - ADLS_CONNECTION_STRING, - ADLS_ACCOUNT_HOST, - } + if account_name := get(ADLS_ACCOUNT_NAME, "adls.account_name"): + client_kwargs["account_name"] = account_name + + if account_key := get(ADLS_ACCOUNT_KEY, "adls.account_key"): + client_kwargs["account_key"] = account_key + + if blob_storage_authority := get(ADLS_BLOB_STORAGE_AUTHORITY, "adls.blob_storage_authority"): + client_kwargs["blob_storage_authority"] = blob_storage_authority + + if dfs_storage_authority := get(ADLS_DFS_STORAGE_AUTHORITY, "adls.dfs_storage_authority"): + client_kwargs["dfs_storage_authority"] = dfs_storage_authority - client_kwargs = self._process_basic_properties(property_mapping, special_properties, "adls") + if blob_storage_scheme := get(ADLS_BLOB_STORAGE_SCHEME, "adls.blob_storage_scheme"): + client_kwargs["blob_storage_scheme"] = blob_storage_scheme + + if dfs_storage_scheme := get(ADLS_DFS_STORAGE_SCHEME, "adls.dfs_storage_scheme"): + client_kwargs["dfs_storage_scheme"] = dfs_storage_scheme + + if sas_token := get(ADLS_SAS_TOKEN, "adls.sas_token"): + client_kwargs["sas_token"] = sas_token + + remaining_adls_props = { + k.removeprefix("adls."): v for k, v in self.properties.items() if k.startswith("adls.") and k not in used_keys + } + client_kwargs = {**remaining_adls_props, **client_kwargs} return AzureFileSystem(**client_kwargs) def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: @@ -640,59 +637,62 @@ def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: if netloc: return HadoopFileSystem.from_uri(f"{scheme}://{netloc}") - # Mapping from PyIceberg properties to HadoopFileSystem parameter names - property_mapping = { - HDFS_HOST: "host", - HDFS_PORT: "port", - HDFS_USER: "user", - HDFS_KERB_TICKET: "kerb_ticket", - } - - hdfs_kwargs = self._process_basic_properties(property_mapping, set(), "hdfs") - - # Handle port conversion to int - if "port" in hdfs_kwargs: - hdfs_kwargs["port"] = int(hdfs_kwargs["port"]) + properties = filter_properties(self.properties, key_predicate=lambda k: k.startswith("hdfs.")) + used_keys: set[str] = set() + get = lambda *keys: self._get_first_property_value_with_tracking(properties, used_keys, *keys) # noqa: E731 + client_kwargs = {} - return HadoopFileSystem(**hdfs_kwargs) + if host := get(HDFS_HOST): + client_kwargs["host"] = host + if port := get(HDFS_PORT): + # port should be an integer type + client_kwargs["port"] = int(port) + if user := get(HDFS_USER): + client_kwargs["user"] = user + if kerb_ticket := get(HDFS_KERB_TICKET, "hdfs.kerb_ticket"): + client_kwargs["kerb_ticket"] = kerb_ticket + + remaining_hdfs_props = { + k.removeprefix("hdfs."): v for k, v in self.properties.items() if k.startswith("hdfs.") and k not in used_keys + } + client_kwargs = {**remaining_hdfs_props, **client_kwargs} + return HadoopFileSystem(**client_kwargs) def _initialize_gcs_fs(self) -> FileSystem: from pyarrow.fs import GcsFileSystem - # Mapping from PyIceberg properties to GcsFileSystem parameter names - property_mapping = { - GCS_TOKEN: "access_token", - GCS_DEFAULT_LOCATION: "default_bucket_location", - GCS_PROJECT_ID: "project_id", - } - - # Properties that need special handling - special_properties = { - GCS_TOKEN_EXPIRES_AT_MS, - GCS_SERVICE_HOST, - GCS_ACCESS, - GCS_CONSISTENCY, - GCS_CACHE_TIMEOUT, - GCS_REQUESTER_PAYS, - GCS_SESSION_KWARGS, - GCS_VERSION_AWARE, - } - - gcs_kwargs = self._process_basic_properties(property_mapping, special_properties, "gcs") - - if expiration := self.properties.get(GCS_TOKEN_EXPIRES_AT_MS): - gcs_kwargs["credential_token_expiration"] = millis_to_datetime(int(expiration)) + properties = filter_properties(self.properties, key_predicate=lambda k: k.startswith("gcs.")) + used_keys: set[str] = set() + get = lambda *keys: self._get_first_property_value_with_tracking(properties, used_keys, *keys) # noqa: E731 + client_kwargs = {} - if endpoint := self.properties.get(GCS_SERVICE_HOST): + if access_token := get(GCS_TOKEN, "gcs.access_token"): + client_kwargs["access_token"] = access_token + if expiration := get(GCS_TOKEN_EXPIRES_AT_MS, "gcs.credential_token_expiration"): + client_kwargs["credential_token_expiration"] = millis_to_datetime(int(expiration)) + if bucket_location := get(GCS_DEFAULT_LOCATION, "gcs.default_bucket_location"): + client_kwargs["default_bucket_location"] = bucket_location + if endpoint := get(GCS_SERVICE_HOST): url_parts = urlparse(endpoint) - gcs_kwargs["scheme"] = url_parts.scheme - gcs_kwargs["endpoint_override"] = url_parts.netloc - - return GcsFileSystem(**gcs_kwargs) + client_kwargs["scheme"] = url_parts.scheme + client_kwargs["endpoint_override"] = url_parts.netloc + if scheme := get("gcs.scheme") and "scheme" not in client_kwargs: + client_kwargs["scheme"] = scheme + if endpoint_override := get("gcs.endpoint_override") and "endpoint_override" not in client_kwargs: + client_kwargs["endpoint_override"] = endpoint_override + + if project_id := get(GCS_PROJECT_ID, "gcs.project_id"): + client_kwargs["project_id"] = project_id + + remaining_gcs_props = { + k.removeprefix("gcs."): v for k, v in self.properties.items() if k.startswith("gcs.") and k not in used_keys + } + client_kwargs = {**remaining_gcs_props, **client_kwargs} + return GcsFileSystem(**client_kwargs) def _initialize_local_fs(self) -> FileSystem: - local_kwargs = self._process_basic_properties({}, set(), "file") - return PyArrowLocalFileSystem(**local_kwargs) + client_kwargs = {k.removeprefix("file."): v for k, v in self.properties.items() if k.startswith("file.")} + return PyArrowLocalFileSystem(**client_kwargs) def new_input(self, location: str) -> PyArrowFile: """Get a PyArrowFile instance to read bytes from the file at the given location. From 902e20e3f2a892daf3ec80931f6b5aaa9f71538e Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 3 Aug 2025 13:07:58 -0700 Subject: [PATCH 09/21] remove unused --- pyiceberg/io/pyarrow.py | 32 ++++---------------------------- pyiceberg/utils/properties.py | 20 -------------------- 2 files changed, 4 insertions(+), 48 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index dcf09f20ea..bc4e5292a9 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -426,32 +426,6 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste else: raise ValueError(f"Unrecognized filesystem type in URI: {scheme}") - def _process_basic_properties( - self, property_mapping: Dict[str, str], special_properties: Set[str], prefix: str - ) -> Dict[str, Any]: - """Process basic property mappings and prefix passthrough logic.""" - client_kwargs: Dict[str, Any] = {} - - for prop_name, prop_value in self.properties.items(): - if prop_value is None: - continue - - # Skip properties that need special handling - if prop_name in special_properties: - continue - - # Map known property names to filesystem parameter names - if prop_name in property_mapping: - param_name = property_mapping[prop_name] - client_kwargs[param_name] = prop_value - - # Pass through any other {prefix}.* properties - elif prop_name.startswith(f"{prefix}."): - param_name = prop_name.split(".", 1)[1] - client_kwargs[param_name] = prop_value - - return client_kwargs - def _get_first_property_value_with_tracking(self, props: Properties, used_keys: set[str], *keys: str) -> Optional[Any]: """Tracks all candidate keys and returns the first value found.""" used_keys.update(keys) @@ -676,9 +650,11 @@ def _initialize_gcs_fs(self) -> FileSystem: url_parts = urlparse(endpoint) client_kwargs["scheme"] = url_parts.scheme client_kwargs["endpoint_override"] = url_parts.netloc - if scheme := get("gcs.scheme") and "scheme" not in client_kwargs: + if scheme := get("gcs.scheme") and "scheme" not in client_kwargs: # GCS_SERVICE_HOST takes precedence client_kwargs["scheme"] = scheme - if endpoint_override := get("gcs.endpoint_override") and "endpoint_override" not in client_kwargs: + if ( + endpoint_override := get("gcs.endpoint_override") and "endpoint_override" not in client_kwargs + ): # GCS_SERVICE_HOST takes precedence client_kwargs["endpoint_override"] = endpoint_override if project_id := get(GCS_PROJECT_ID, "gcs.project_id"): diff --git a/pyiceberg/utils/properties.py b/pyiceberg/utils/properties.py index 8d8d196dc5..223ddb3186 100644 --- a/pyiceberg/utils/properties.py +++ b/pyiceberg/utils/properties.py @@ -86,26 +86,6 @@ def get_header_properties( return {key[header_prefix_len:]: value for key, value in properties.items() if key.startswith(HEADER_PREFIX)} -def properties_with_prefix( - properties: Properties, - prefix: str, -) -> Properties: - """ - Return subset of provided map with keys matching the provided prefix. Matching is case-sensitive and the matching prefix is removed from the keys in returned map. - - Args: - properties: input map - prefix: prefix to choose keys from input map - - Returns: - subset of input map with keys starting with provided prefix and prefix trimmed out - """ - if not properties: - return {} - - return {key[len(prefix) :]: value for key, value in properties.items() if key.startswith(prefix)} - - def filter_properties( properties: Properties, key_predicate: Callable[[str], bool], From 197d0e6888dff7eeedecf8d03116c5dcb908f2e3 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 3 Aug 2025 13:18:14 -0700 Subject: [PATCH 10/21] simplify get remaining properties --- pyiceberg/io/pyarrow.py | 28 ++++++++++++---------------- pyiceberg/utils/properties.py | 20 ++++++++++++++++++++ 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index bc4e5292a9..8032ef56b9 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -186,6 +186,7 @@ from pyiceberg.utils.deprecated import deprecation_message from pyiceberg.utils.properties import ( filter_properties, + properties_with_prefix, property_as_bool, property_as_int, ) @@ -508,9 +509,8 @@ def _initialize_oss_fs(self) -> FileSystem: if session_name := get(S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME, "oss.session_name"): client_kwargs["session_name"] = session_name - remaining_oss_props = { - k.removeprefix("oss."): v for k, v in self.properties.items() if k.startswith("oss.") and k not in used_keys - } + # get the rest of the properties with the `oss.` prefix that are not already evaluated + remaining_oss_props = properties_with_prefix({k: v for k, v in self.properties.items() if k not in used_keys}, "oss.") client_kwargs = {**remaining_oss_props, **client_kwargs} return S3FileSystem(**client_kwargs) @@ -555,9 +555,8 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem: if retry_instance := _import_retry_strategy(retry_strategy_impl): client_kwargs["retry_strategy"] = retry_instance - remaining_s3_props = { - k.removeprefix("s3."): v for k, v in self.properties.items() if k.startswith("s3.") and k not in used_keys - } + # get the rest of the properties with the `s3.` prefix that are not already evaluated + remaining_s3_props = properties_with_prefix({k: v for k, v in self.properties.items() if k not in used_keys}, "s3.") client_kwargs = {**remaining_s3_props, **client_kwargs} return S3FileSystem(**client_kwargs) @@ -599,9 +598,8 @@ def _initialize_azure_fs(self) -> FileSystem: if sas_token := get(ADLS_SAS_TOKEN, "adls.sas_token"): client_kwargs["sas_token"] = sas_token - remaining_adls_props = { - k.removeprefix("adls."): v for k, v in self.properties.items() if k.startswith("adls.") and k not in used_keys - } + # get the rest of the properties with the `adls.` prefix that are not already evaluated + remaining_adls_props = properties_with_prefix({k: v for k, v in self.properties.items() if k not in used_keys}, "adls.") client_kwargs = {**remaining_adls_props, **client_kwargs} return AzureFileSystem(**client_kwargs) @@ -626,9 +624,8 @@ def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: if kerb_ticket := get(HDFS_KERB_TICKET, "hdfs.kerb_ticket"): client_kwargs["kerb_ticket"] = kerb_ticket - remaining_hdfs_props = { - k.removeprefix("hdfs."): v for k, v in self.properties.items() if k.startswith("hdfs.") and k not in used_keys - } + # get the rest of the properties with the `hdfs.` prefix that are not already evaluated + remaining_hdfs_props = properties_with_prefix({k: v for k, v in self.properties.items() if k not in used_keys}, "hdfs.") client_kwargs = {**remaining_hdfs_props, **client_kwargs} return HadoopFileSystem(**client_kwargs) @@ -660,14 +657,13 @@ def _initialize_gcs_fs(self) -> FileSystem: if project_id := get(GCS_PROJECT_ID, "gcs.project_id"): client_kwargs["project_id"] = project_id - remaining_gcs_props = { - k.removeprefix("gcs."): v for k, v in self.properties.items() if k.startswith("gcs.") and k not in used_keys - } + # get the rest of the properties with the `gcs.` prefix that are not already evaluated + remaining_gcs_props = properties_with_prefix({k: v for k, v in self.properties.items() if k not in used_keys}, "gcs.") client_kwargs = {**remaining_gcs_props, **client_kwargs} return GcsFileSystem(**client_kwargs) def _initialize_local_fs(self) -> FileSystem: - client_kwargs = {k.removeprefix("file."): v for k, v in self.properties.items() if k.startswith("file.")} + client_kwargs = properties_with_prefix(self.properties, "file.") return PyArrowLocalFileSystem(**client_kwargs) def new_input(self, location: str) -> PyArrowFile: diff --git a/pyiceberg/utils/properties.py b/pyiceberg/utils/properties.py index 223ddb3186..8d8d196dc5 100644 --- a/pyiceberg/utils/properties.py +++ b/pyiceberg/utils/properties.py @@ -86,6 +86,26 @@ def get_header_properties( return {key[header_prefix_len:]: value for key, value in properties.items() if key.startswith(HEADER_PREFIX)} +def properties_with_prefix( + properties: Properties, + prefix: str, +) -> Properties: + """ + Return subset of provided map with keys matching the provided prefix. Matching is case-sensitive and the matching prefix is removed from the keys in returned map. + + Args: + properties: input map + prefix: prefix to choose keys from input map + + Returns: + subset of input map with keys starting with provided prefix and prefix trimmed out + """ + if not properties: + return {} + + return {key[len(prefix) :]: value for key, value in properties.items() if key.startswith(prefix)} + + def filter_properties( properties: Properties, key_predicate: Callable[[str], bool], From 51f9f735de7868b335e08732a55a214e4293433a Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 3 Aug 2025 13:31:13 -0700 Subject: [PATCH 11/21] reorder --- pyiceberg/io/pyarrow.py | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 8032ef56b9..351e9560c3 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -481,7 +481,7 @@ def _initialize_oss_fs(self) -> FileSystem: properties = filter_properties(self.properties, key_predicate=lambda k: k.startswith(("s3.", "client.", "oss."))) used_keys: set[str] = set() get = lambda *keys: self._get_first_property_value_with_tracking(properties, used_keys, *keys) # noqa: E731 - client_kwargs = {} + client_kwargs: Properties = {} if endpoint := get(S3_ENDPOINT, "oss.endpoint_override"): client_kwargs["endpoint_override"] = endpoint @@ -520,7 +520,12 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem: properties = filter_properties(self.properties, key_predicate=lambda k: k.startswith(("s3.", "client."))) used_keys: set[str] = set() get = lambda *keys: self._get_first_property_value_with_tracking(properties, used_keys, *keys) # noqa: E731 - client_kwargs = {} + client_kwargs: Properties = {} + + # Handle S3 region configuration with optional auto-resolution + client_kwargs["region"] = self._resolve_s3_region( + provided_region=get(S3_REGION, AWS_REGION), resolve_region_override=get(S3_RESOLVE_REGION), bucket=netloc + ) if endpoint := get(S3_ENDPOINT, "s3.endpoint_override"): client_kwargs["endpoint_override"] = endpoint @@ -530,15 +535,6 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem: client_kwargs["secret_key"] = secret_key if session_token := get(S3_SESSION_TOKEN, AWS_SESSION_TOKEN, "s3.session_token"): client_kwargs["session_token"] = session_token - - # Handle S3 region configuration with optional auto-resolution - client_kwargs["region"] = self._resolve_s3_region( - provided_region=get(S3_REGION, AWS_REGION), resolve_region_override=get(S3_RESOLVE_REGION), bucket=netloc - ) - - if force_virtual_addressing := get(S3_FORCE_VIRTUAL_ADDRESSING, "s3.force_virtual_addressing"): - client_kwargs["force_virtual_addressing"] = self._convert_str_to_bool(force_virtual_addressing) - if proxy_uri := get(S3_PROXY_URI, "s3.proxy_options"): client_kwargs["proxy_options"] = proxy_uri if connect_timeout := get(S3_CONNECT_TIMEOUT, "s3.connect_timeout"): @@ -550,6 +546,8 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem: if session_name := get(S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME, "s3.session_name"): client_kwargs["session_name"] = session_name + if force_virtual_addressing := get(S3_FORCE_VIRTUAL_ADDRESSING, "s3.force_virtual_addressing"): + client_kwargs["force_virtual_addressing"] = self._convert_str_to_bool(force_virtual_addressing) # Handle retry strategy special case if retry_strategy_impl := get(S3_RETRY_STRATEGY_IMPL, "s3.retry_strategy"): if retry_instance := _import_retry_strategy(retry_strategy_impl): @@ -575,7 +573,7 @@ def _initialize_azure_fs(self) -> FileSystem: properties = filter_properties(self.properties, key_predicate=lambda k: k.startswith("adls.")) used_keys: set[str] = set() get = lambda *keys: self._get_first_property_value_with_tracking(properties, used_keys, *keys) # noqa: E731 - client_kwargs = {} + client_kwargs: Properties = {} if account_name := get(ADLS_ACCOUNT_NAME, "adls.account_name"): client_kwargs["account_name"] = account_name @@ -612,7 +610,7 @@ def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: properties = filter_properties(self.properties, key_predicate=lambda k: k.startswith("hdfs.")) used_keys: set[str] = set() get = lambda *keys: self._get_first_property_value_with_tracking(properties, used_keys, *keys) # noqa: E731 - client_kwargs = {} + client_kwargs: Properties = {} if host := get(HDFS_HOST): client_kwargs["host"] = host @@ -635,7 +633,7 @@ def _initialize_gcs_fs(self) -> FileSystem: properties = filter_properties(self.properties, key_predicate=lambda k: k.startswith("gcs.")) used_keys: set[str] = set() get = lambda *keys: self._get_first_property_value_with_tracking(properties, used_keys, *keys) # noqa: E731 - client_kwargs = {} + client_kwargs: Properties = {} if access_token := get(GCS_TOKEN, "gcs.access_token"): client_kwargs["access_token"] = access_token From 2e5926bc1a6a5e3e2c70a2bec03cdf701fdea728 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 4 Aug 2025 13:15:20 -0700 Subject: [PATCH 12/21] Update pyiceberg/io/pyarrow.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- pyiceberg/io/pyarrow.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 351e9560c3..1585a96f37 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -645,10 +645,10 @@ def _initialize_gcs_fs(self) -> FileSystem: url_parts = urlparse(endpoint) client_kwargs["scheme"] = url_parts.scheme client_kwargs["endpoint_override"] = url_parts.netloc - if scheme := get("gcs.scheme") and "scheme" not in client_kwargs: # GCS_SERVICE_HOST takes precedence + if (scheme := get("gcs.scheme")) and "scheme" not in client_kwargs: # GCS_SERVICE_HOST takes precedence client_kwargs["scheme"] = scheme if ( - endpoint_override := get("gcs.endpoint_override") and "endpoint_override" not in client_kwargs + (endpoint_override := get("gcs.endpoint_override")) and "endpoint_override" not in client_kwargs ): # GCS_SERVICE_HOST takes precedence client_kwargs["endpoint_override"] = endpoint_override From 7e4d5902257686fa3b982ce5e4a674cc12569ddc Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 4 Aug 2025 20:16:07 +0000 Subject: [PATCH 13/21] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pyiceberg/io/pyarrow.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 1585a96f37..896f84a7ae 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -648,8 +648,8 @@ def _initialize_gcs_fs(self) -> FileSystem: if (scheme := get("gcs.scheme")) and "scheme" not in client_kwargs: # GCS_SERVICE_HOST takes precedence client_kwargs["scheme"] = scheme if ( - (endpoint_override := get("gcs.endpoint_override")) and "endpoint_override" not in client_kwargs - ): # GCS_SERVICE_HOST takes precedence + endpoint_override := get("gcs.endpoint_override") + ) and "endpoint_override" not in client_kwargs: # GCS_SERVICE_HOST takes precedence client_kwargs["endpoint_override"] = endpoint_override if project_id := get(GCS_PROJECT_ID, "gcs.project_id"): From 179f62c060e3ea8f5e663b4ba27e65fac882cb71 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 4 Aug 2025 13:21:45 -0700 Subject: [PATCH 14/21] fix docs --- mkdocs/docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 2f71ddbd4f..a1521b5592 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -128,7 +128,7 @@ Below are examples of supported prefixes and how the properties are passed throu | `s3.` | [S3FileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.S3FileSystem.html) | `s3.load_frequency=900` | Passed as `load_frequency=900` to S3FileSystem | | `hdfs.` | [HadoopFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.HadoopFileSystem.html) | `hdfs.replication=3` | Passed as `replication=3` to HadoopFileSystem | | `gcs.` | [GcsFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.GcsFileSystem.html) | `gcs.project_id=test` | Passed as `project_id='test'` to GcsFileSystem | -| `adls.` | [AzureFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.AzureFileSystem.html) | `adls.blob_cache-size=1024` | Passed as `blob_cache_size=1024` to AzureFileSystem | +| `adls.` | [AzureFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.AzureFileSystem.html) | `adls.account_name=foo` | Passed as `account_name=foo` to AzureFileSystem | | `oss.` | [S3FileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.S3FileSystem.html) | `oss.connect_timeout=30.0` | Passed as `connect_timeout=30.0` to S3FileSystem | | `file.` | [LocalFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.LocalFileSystem.html) | `file.use_mmap=true` | Passed as `use_mmap=True` to LocalFileSystem | From 669d0245758e2f8a644578a572af28a669599ac9 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 4 Aug 2025 14:41:58 -0700 Subject: [PATCH 15/21] docs --- mkdocs/docs/configuration.md | 69 ++++++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 27 deletions(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index a1521b5592..5abeea102c 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -105,33 +105,6 @@ You can also set the FileIO explicitly: For the FileIO there are several configuration options available: -### PyArrow FileSystem Extra Properties - -When using `PyArrowFileIO`, any properties with filesystem specific prefixes that are not explicitly handled by PyIceberg will be passed to the underlying PyArrow filesystem implementations. - -To use these properties, follow the format: - -```txt -{fs_scheme}.{parameter_name} -``` - -- {fs_scheme} is the filesystem scheme (e.g., s3, hdfs, gcs). -- {parameter_name} must match the name expected by the PyArrow filesystem. -- Property values must use the correct type expected by the underlying filesystem (e.g., string, integer, boolean). - -Below are examples of supported prefixes and how the properties are passed through: - - - -| Property Prefix | FileSystem | Example | Description | -|-----------------|------------------------------------------------------------------------------------------------------|-----------------------------|-----------------------------------------------------| -| `s3.` | [S3FileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.S3FileSystem.html) | `s3.load_frequency=900` | Passed as `load_frequency=900` to S3FileSystem | -| `hdfs.` | [HadoopFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.HadoopFileSystem.html) | `hdfs.replication=3` | Passed as `replication=3` to HadoopFileSystem | -| `gcs.` | [GcsFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.GcsFileSystem.html) | `gcs.project_id=test` | Passed as `project_id='test'` to GcsFileSystem | -| `adls.` | [AzureFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.AzureFileSystem.html) | `adls.account_name=foo` | Passed as `account_name=foo` to AzureFileSystem | -| `oss.` | [S3FileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.S3FileSystem.html) | `oss.connect_timeout=30.0` | Passed as `connect_timeout=30.0` to S3FileSystem | -| `file.` | [LocalFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.LocalFileSystem.html) | `file.use_mmap=true` | Passed as `use_mmap=True` to LocalFileSystem | - ### S3 @@ -240,6 +213,8 @@ PyIceberg uses [S3FileSystem](https://arrow.apache.org/docs/python/generated/pya ### PyArrow +#### PyArrow Specific Properties + | Key | Example | Description | @@ -247,6 +222,46 @@ PyIceberg uses [S3FileSystem](https://arrow.apache.org/docs/python/generated/pya | pyarrow.use-large-types-on-read | True | Use large PyArrow types i.e. [large_string](https://arrow.apache.org/docs/python/generated/pyarrow.large_string.html), [large_binary](https://arrow.apache.org/docs/python/generated/pyarrow.large_binary.html) and [large_list](https://arrow.apache.org/docs/python/generated/pyarrow.large_list.html) field types on table scans. The default value is True. | +#### Advanced FileSystem Configuration + +When using `PyArrowFileIO`, you can **pass additional configuration properties directly to the underlying PyArrow filesystem implementations**. This feature enables you to use any PyArrow filesystem option without requiring explicit PyIceberg support. + +PyIceberg first processes its own supported properties for each filesystem, then passes any remaining properties with the appropriate prefix directly to the PyArrow filesystem constructor. This approach ensures: + +1. PyIceberg's built-in properties take precedence +2. Advanced PyArrow options are automatically supported +3. New PyArrow features become available immediately + +##### Configuration Format + +Use this format for additional properties: + +```txt +{fs_scheme}.{parameter_name}={value} +``` + +Where: + +- `{fs_scheme}` is the filesystem scheme (e.g., `s3`, `hdfs`, `gcs`, `adls`, `oss`, `file`) +- `{parameter_name}` must match the exact parameter name expected by the PyArrow filesystem constructor +- `{value}` must be the correct type expected by the underlying filesystem (string, integer, boolean, etc.) + +##### Supported Prefixes and FileSystems + + + +| Property Prefix | FileSystem | Example | Description | +|-----------------|------------------------------------------------------------------------------------------------------|-----------------------------|-----------------------------------------------------| +| `s3.` | [S3FileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.S3FileSystem.html) | `s3.load_frequency=900` | Passed as `load_frequency=900` to S3FileSystem | +| `hdfs.` | [HadoopFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.HadoopFileSystem.html) | `hdfs.replication=3` | Passed as `replication=3` to HadoopFileSystem | +| `gcs.` | [GcsFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.GcsFileSystem.html) | `gcs.project_id=test` | Passed as `project_id='test'` to GcsFileSystem | +| `adls.` | [AzureFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.AzureFileSystem.html) | `adls.account_name=foo` | Passed as `account_name=foo` to AzureFileSystem | +| `oss.` | [S3FileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.S3FileSystem.html) | `oss.connect_timeout=30.0` | Passed as `connect_timeout=30.0` to S3FileSystem | +| `file.` | [LocalFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.LocalFileSystem.html) | `file.use_mmap=true` | Passed as `use_mmap=True` to LocalFileSystem | + + + +**Note:** Refer to the PyArrow documentation for each filesystem to understand the available parameters and their expected types. Property values are passed directly to PyArrow, so they must match the exact parameter names and types expected by the filesystem constructors. ## Location Providers From 4b1ff31adedf47c19aa4c8aa377aa9b53590daec Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 4 Aug 2025 14:50:42 -0700 Subject: [PATCH 16/21] lint --- mkdocs/docs/configuration.md | 7 ------- 1 file changed, 7 deletions(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 5abeea102c..f19208b465 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -215,13 +215,10 @@ PyIceberg uses [S3FileSystem](https://arrow.apache.org/docs/python/generated/pya #### PyArrow Specific Properties - - | Key | Example | Description | | ------------------------------- | ------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | pyarrow.use-large-types-on-read | True | Use large PyArrow types i.e. [large_string](https://arrow.apache.org/docs/python/generated/pyarrow.large_string.html), [large_binary](https://arrow.apache.org/docs/python/generated/pyarrow.large_binary.html) and [large_list](https://arrow.apache.org/docs/python/generated/pyarrow.large_list.html) field types on table scans. The default value is True. | - #### Advanced FileSystem Configuration When using `PyArrowFileIO`, you can **pass additional configuration properties directly to the underlying PyArrow filesystem implementations**. This feature enables you to use any PyArrow filesystem option without requiring explicit PyIceberg support. @@ -248,8 +245,6 @@ Where: ##### Supported Prefixes and FileSystems - - | Property Prefix | FileSystem | Example | Description | |-----------------|------------------------------------------------------------------------------------------------------|-----------------------------|-----------------------------------------------------| | `s3.` | [S3FileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.S3FileSystem.html) | `s3.load_frequency=900` | Passed as `load_frequency=900` to S3FileSystem | @@ -259,8 +254,6 @@ Where: | `oss.` | [S3FileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.S3FileSystem.html) | `oss.connect_timeout=30.0` | Passed as `connect_timeout=30.0` to S3FileSystem | | `file.` | [LocalFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.LocalFileSystem.html) | `file.use_mmap=true` | Passed as `use_mmap=True` to LocalFileSystem | - - **Note:** Refer to the PyArrow documentation for each filesystem to understand the available parameters and their expected types. Property values are passed directly to PyArrow, so they must match the exact parameter names and types expected by the filesystem constructors. ## Location Providers From b3a6c6836db63d8e460acca93551c053bfc83cf4 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 4 Aug 2025 16:13:47 -0700 Subject: [PATCH 17/21] get rid of oss prefix --- mkdocs/docs/configuration.md | 3 +-- pyiceberg/io/pyarrow.py | 32 ++++++++++++++++---------------- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index f19208b465..d3a22bbbff 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -239,7 +239,7 @@ Use this format for additional properties: Where: -- `{fs_scheme}` is the filesystem scheme (e.g., `s3`, `hdfs`, `gcs`, `adls`, `oss`, `file`) +- `{fs_scheme}` is the filesystem scheme (e.g., `s3`, `hdfs`, `gcs`, `adls`, `file`) - `{parameter_name}` must match the exact parameter name expected by the PyArrow filesystem constructor - `{value}` must be the correct type expected by the underlying filesystem (string, integer, boolean, etc.) @@ -251,7 +251,6 @@ Where: | `hdfs.` | [HadoopFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.HadoopFileSystem.html) | `hdfs.replication=3` | Passed as `replication=3` to HadoopFileSystem | | `gcs.` | [GcsFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.GcsFileSystem.html) | `gcs.project_id=test` | Passed as `project_id='test'` to GcsFileSystem | | `adls.` | [AzureFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.AzureFileSystem.html) | `adls.account_name=foo` | Passed as `account_name=foo` to AzureFileSystem | -| `oss.` | [S3FileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.S3FileSystem.html) | `oss.connect_timeout=30.0` | Passed as `connect_timeout=30.0` to S3FileSystem | | `file.` | [LocalFileSystem](https://arrow.apache.org/docs/python/generated/pyarrow.fs.LocalFileSystem.html) | `file.use_mmap=true` | Passed as `use_mmap=True` to LocalFileSystem | **Note:** Refer to the PyArrow documentation for each filesystem to understand the available parameters and their expected types. Property values are passed directly to PyArrow, so they must match the exact parameter names and types expected by the filesystem constructors. diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 896f84a7ae..aa35d706c1 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -478,40 +478,40 @@ def _resolve_s3_region( def _initialize_oss_fs(self) -> FileSystem: from pyarrow.fs import S3FileSystem - properties = filter_properties(self.properties, key_predicate=lambda k: k.startswith(("s3.", "client.", "oss."))) + properties = filter_properties(self.properties, key_predicate=lambda k: k.startswith(("s3.", "client."))) used_keys: set[str] = set() get = lambda *keys: self._get_first_property_value_with_tracking(properties, used_keys, *keys) # noqa: E731 client_kwargs: Properties = {} - if endpoint := get(S3_ENDPOINT, "oss.endpoint_override"): + if endpoint := get(S3_ENDPOINT, "s3.endpoint_override"): client_kwargs["endpoint_override"] = endpoint - if access_key := get(S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID, "oss.access_key"): + if access_key := get(S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID, "s3.access_key"): client_kwargs["access_key"] = access_key - if secret_key := get(S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY, "oss.secret_key"): + if secret_key := get(S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY, "s3.secret_key"): client_kwargs["secret_key"] = secret_key - if session_token := get(S3_SESSION_TOKEN, AWS_SESSION_TOKEN, "oss.session_token"): + if session_token := get(S3_SESSION_TOKEN, AWS_SESSION_TOKEN, "s3.session_token"): client_kwargs["session_token"] = session_token - if region := get(S3_REGION, AWS_REGION, "oss.region"): + if region := get(S3_REGION, AWS_REGION): client_kwargs["region"] = region - if force_virtual_addressing := get(S3_FORCE_VIRTUAL_ADDRESSING, "oss.force_virtual_addressing"): + if force_virtual_addressing := get(S3_FORCE_VIRTUAL_ADDRESSING, "s3.force_virtual_addressing"): client_kwargs["force_virtual_addressing"] = self._convert_str_to_bool(force_virtual_addressing) else: - # For OSS FS, default to True + # For Alibaba OSS protocol, default to True client_kwargs["force_virtual_addressing"] = True - if proxy_uri := get(S3_PROXY_URI, "oss.proxy_options"): + if proxy_uri := get(S3_PROXY_URI, "s3.proxy_options"): client_kwargs["proxy_options"] = proxy_uri - if connect_timeout := get(S3_CONNECT_TIMEOUT, "oss.connect_timeout"): + if connect_timeout := get(S3_CONNECT_TIMEOUT, "s3.connect_timeout"): client_kwargs["connect_timeout"] = float(connect_timeout) - if request_timeout := get(S3_REQUEST_TIMEOUT, "oss.request_timeout"): + if request_timeout := get(S3_REQUEST_TIMEOUT, "s3.request_timeout"): client_kwargs["request_timeout"] = float(request_timeout) - if role_arn := get(S3_ROLE_ARN, AWS_ROLE_ARN, "oss.role_arn"): + if role_arn := get(S3_ROLE_ARN, AWS_ROLE_ARN, "s3.role_arn"): client_kwargs["role_arn"] = role_arn - if session_name := get(S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME, "oss.session_name"): + if session_name := get(S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME, "s3.session_name"): client_kwargs["session_name"] = session_name - # get the rest of the properties with the `oss.` prefix that are not already evaluated - remaining_oss_props = properties_with_prefix({k: v for k, v in self.properties.items() if k not in used_keys}, "oss.") - client_kwargs = {**remaining_oss_props, **client_kwargs} + # get the rest of the properties with the `s3.` prefix that are not already evaluated + remaining_s3_props = properties_with_prefix({k: v for k, v in self.properties.items() if k not in used_keys}, "s3.") + client_kwargs = {**remaining_s3_props, **client_kwargs} return S3FileSystem(**client_kwargs) def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem: From 8234954b3a939fb0dfa227f41aa7cae74a313db8 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 4 Aug 2025 16:25:34 -0700 Subject: [PATCH 18/21] fix S3_RESOLVE_REGION for oss:// --- pyiceberg/io/pyarrow.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index aa35d706c1..16b129bb28 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -493,6 +493,9 @@ def _initialize_oss_fs(self) -> FileSystem: client_kwargs["session_token"] = session_token if region := get(S3_REGION, AWS_REGION): client_kwargs["region"] = region + _ = get( + S3_RESOLVE_REGION + ) # this feature is only available for S3. Use `get` here so it does not get passed down to the S3FileSystem constructor if force_virtual_addressing := get(S3_FORCE_VIRTUAL_ADDRESSING, "s3.force_virtual_addressing"): client_kwargs["force_virtual_addressing"] = self._convert_str_to_bool(force_virtual_addressing) else: From 1dba0558a7a420fe6ef6de26ded8453575d90bf1 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 4 Aug 2025 22:13:05 -0700 Subject: [PATCH 19/21] remove noqa: E731 --- pyiceberg/io/pyarrow.py | 34 +++++++++++++++++++++------------- pyiceberg/utils/properties.py | 9 +++++++++ 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 16b129bb28..e4db4f5bf1 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -186,6 +186,7 @@ from pyiceberg.utils.deprecated import deprecation_message from pyiceberg.utils.properties import ( filter_properties, + get_first_property_value_with_tracking, properties_with_prefix, property_as_bool, property_as_int, @@ -427,14 +428,6 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste else: raise ValueError(f"Unrecognized filesystem type in URI: {scheme}") - def _get_first_property_value_with_tracking(self, props: Properties, used_keys: set[str], *keys: str) -> Optional[Any]: - """Tracks all candidate keys and returns the first value found.""" - used_keys.update(keys) - for key in keys: - if key in props: - return props[key] - return None - def _convert_str_to_bool(self, value: Any) -> bool: """Convert string or other value to boolean, handling string representations properly.""" if isinstance(value, str): @@ -480,7 +473,10 @@ def _initialize_oss_fs(self) -> FileSystem: properties = filter_properties(self.properties, key_predicate=lambda k: k.startswith(("s3.", "client."))) used_keys: set[str] = set() - get = lambda *keys: self._get_first_property_value_with_tracking(properties, used_keys, *keys) # noqa: E731 + + def get(*keys: str) -> str | None: + return get_first_property_value_with_tracking(properties, used_keys, *keys) + client_kwargs: Properties = {} if endpoint := get(S3_ENDPOINT, "s3.endpoint_override"): @@ -522,7 +518,10 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem: properties = filter_properties(self.properties, key_predicate=lambda k: k.startswith(("s3.", "client."))) used_keys: set[str] = set() - get = lambda *keys: self._get_first_property_value_with_tracking(properties, used_keys, *keys) # noqa: E731 + + def get(*keys: str) -> str | None: + return get_first_property_value_with_tracking(properties, used_keys, *keys) + client_kwargs: Properties = {} # Handle S3 region configuration with optional auto-resolution @@ -575,7 +574,10 @@ def _initialize_azure_fs(self) -> FileSystem: properties = filter_properties(self.properties, key_predicate=lambda k: k.startswith("adls.")) used_keys: set[str] = set() - get = lambda *keys: self._get_first_property_value_with_tracking(properties, used_keys, *keys) # noqa: E731 + + def get(*keys: str) -> str | None: + return get_first_property_value_with_tracking(properties, used_keys, *keys) + client_kwargs: Properties = {} if account_name := get(ADLS_ACCOUNT_NAME, "adls.account_name"): @@ -612,7 +614,10 @@ def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: properties = filter_properties(self.properties, key_predicate=lambda k: k.startswith("hdfs.")) used_keys: set[str] = set() - get = lambda *keys: self._get_first_property_value_with_tracking(properties, used_keys, *keys) # noqa: E731 + + def get(*keys: str) -> str | None: + return get_first_property_value_with_tracking(properties, used_keys, *keys) + client_kwargs: Properties = {} if host := get(HDFS_HOST): @@ -635,7 +640,10 @@ def _initialize_gcs_fs(self) -> FileSystem: properties = filter_properties(self.properties, key_predicate=lambda k: k.startswith("gcs.")) used_keys: set[str] = set() - get = lambda *keys: self._get_first_property_value_with_tracking(properties, used_keys, *keys) # noqa: E731 + + def get(*keys: str) -> str | None: + return get_first_property_value_with_tracking(properties, used_keys, *keys) + client_kwargs: Properties = {} if access_token := get(GCS_TOKEN, "gcs.access_token"): diff --git a/pyiceberg/utils/properties.py b/pyiceberg/utils/properties.py index 8d8d196dc5..3bc7f2c116 100644 --- a/pyiceberg/utils/properties.py +++ b/pyiceberg/utils/properties.py @@ -79,6 +79,15 @@ def get_first_property_value( return None +def get_first_property_value_with_tracking(props: Properties, used_keys: set[str], *keys: str) -> Optional[Any]: + """Tracks all candidate keys and returns the first value found.""" + used_keys.update(keys) + for key in keys: + if key in props: + return props[key] + return None + + def get_header_properties( properties: Properties, ) -> Properties: From 7c483509022a632111d87681a4818e41f382df8d Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 4 Aug 2025 22:40:22 -0700 Subject: [PATCH 20/21] move convert_str_to_bool to properties --- pyiceberg/io/pyarrow.py | 14 ++++---------- pyiceberg/utils/properties.py | 7 +++++++ 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index e4db4f5bf1..804fefb0e5 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -177,7 +177,6 @@ TimeType, UnknownType, UUIDType, - strtobool, ) from pyiceberg.utils.concurrent import ExecutorFactory from pyiceberg.utils.config import Config @@ -185,6 +184,7 @@ from pyiceberg.utils.decimal import unscaled_to_decimal from pyiceberg.utils.deprecated import deprecation_message from pyiceberg.utils.properties import ( + convert_str_to_bool, filter_properties, get_first_property_value_with_tracking, properties_with_prefix, @@ -428,12 +428,6 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste else: raise ValueError(f"Unrecognized filesystem type in URI: {scheme}") - def _convert_str_to_bool(self, value: Any) -> bool: - """Convert string or other value to boolean, handling string representations properly.""" - if isinstance(value, str): - return strtobool(value) - return bool(value) - def _resolve_s3_region( self, provided_region: Optional[str], resolve_region_override: Any, bucket: Optional[str] ) -> Optional[str]: @@ -451,7 +445,7 @@ def _resolve_s3_region( # Handle resolve_region_override conversion should_resolve_region = False if resolve_region_override is not None: - should_resolve_region = self._convert_str_to_bool(resolve_region_override) + should_resolve_region = convert_str_to_bool(resolve_region_override) # If no region provided or explicit resolve requested, try to resolve from bucket if provided_region is None or should_resolve_region: @@ -493,7 +487,7 @@ def get(*keys: str) -> str | None: S3_RESOLVE_REGION ) # this feature is only available for S3. Use `get` here so it does not get passed down to the S3FileSystem constructor if force_virtual_addressing := get(S3_FORCE_VIRTUAL_ADDRESSING, "s3.force_virtual_addressing"): - client_kwargs["force_virtual_addressing"] = self._convert_str_to_bool(force_virtual_addressing) + client_kwargs["force_virtual_addressing"] = convert_str_to_bool(force_virtual_addressing) else: # For Alibaba OSS protocol, default to True client_kwargs["force_virtual_addressing"] = True @@ -549,7 +543,7 @@ def get(*keys: str) -> str | None: client_kwargs["session_name"] = session_name if force_virtual_addressing := get(S3_FORCE_VIRTUAL_ADDRESSING, "s3.force_virtual_addressing"): - client_kwargs["force_virtual_addressing"] = self._convert_str_to_bool(force_virtual_addressing) + client_kwargs["force_virtual_addressing"] = convert_str_to_bool(force_virtual_addressing) # Handle retry strategy special case if retry_strategy_impl := get(S3_RETRY_STRATEGY_IMPL, "s3.retry_strategy"): if retry_instance := _import_retry_strategy(retry_strategy_impl): diff --git a/pyiceberg/utils/properties.py b/pyiceberg/utils/properties.py index 3bc7f2c116..8809d78d8e 100644 --- a/pyiceberg/utils/properties.py +++ b/pyiceberg/utils/properties.py @@ -69,6 +69,13 @@ def property_as_bool( return default +def convert_str_to_bool(value: Any) -> bool: + """Convert string or other value to boolean, handling string representations properly.""" + if isinstance(value, str): + return strtobool(value) + return bool(value) + + def get_first_property_value( properties: Properties, *property_names: str, From 3dd11f75dd7bf97f8b2939d7c39847984fb729bb Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 4 Aug 2025 22:45:03 -0700 Subject: [PATCH 21/21] rename get --- pyiceberg/io/pyarrow.py | 98 +++++++++++++++++++++-------------------- 1 file changed, 51 insertions(+), 47 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 804fefb0e5..b3cb02a0db 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -468,38 +468,38 @@ def _initialize_oss_fs(self) -> FileSystem: properties = filter_properties(self.properties, key_predicate=lambda k: k.startswith(("s3.", "client."))) used_keys: set[str] = set() - def get(*keys: str) -> str | None: + def get_property_with_tracking(*keys: str) -> str | None: return get_first_property_value_with_tracking(properties, used_keys, *keys) client_kwargs: Properties = {} - if endpoint := get(S3_ENDPOINT, "s3.endpoint_override"): + if endpoint := get_property_with_tracking(S3_ENDPOINT, "s3.endpoint_override"): client_kwargs["endpoint_override"] = endpoint - if access_key := get(S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID, "s3.access_key"): + if access_key := get_property_with_tracking(S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID, "s3.access_key"): client_kwargs["access_key"] = access_key - if secret_key := get(S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY, "s3.secret_key"): + if secret_key := get_property_with_tracking(S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY, "s3.secret_key"): client_kwargs["secret_key"] = secret_key - if session_token := get(S3_SESSION_TOKEN, AWS_SESSION_TOKEN, "s3.session_token"): + if session_token := get_property_with_tracking(S3_SESSION_TOKEN, AWS_SESSION_TOKEN, "s3.session_token"): client_kwargs["session_token"] = session_token - if region := get(S3_REGION, AWS_REGION): + if region := get_property_with_tracking(S3_REGION, AWS_REGION): client_kwargs["region"] = region - _ = get( + _ = get_property_with_tracking( S3_RESOLVE_REGION ) # this feature is only available for S3. Use `get` here so it does not get passed down to the S3FileSystem constructor - if force_virtual_addressing := get(S3_FORCE_VIRTUAL_ADDRESSING, "s3.force_virtual_addressing"): + if force_virtual_addressing := get_property_with_tracking(S3_FORCE_VIRTUAL_ADDRESSING, "s3.force_virtual_addressing"): client_kwargs["force_virtual_addressing"] = convert_str_to_bool(force_virtual_addressing) else: # For Alibaba OSS protocol, default to True client_kwargs["force_virtual_addressing"] = True - if proxy_uri := get(S3_PROXY_URI, "s3.proxy_options"): + if proxy_uri := get_property_with_tracking(S3_PROXY_URI, "s3.proxy_options"): client_kwargs["proxy_options"] = proxy_uri - if connect_timeout := get(S3_CONNECT_TIMEOUT, "s3.connect_timeout"): + if connect_timeout := get_property_with_tracking(S3_CONNECT_TIMEOUT, "s3.connect_timeout"): client_kwargs["connect_timeout"] = float(connect_timeout) - if request_timeout := get(S3_REQUEST_TIMEOUT, "s3.request_timeout"): + if request_timeout := get_property_with_tracking(S3_REQUEST_TIMEOUT, "s3.request_timeout"): client_kwargs["request_timeout"] = float(request_timeout) - if role_arn := get(S3_ROLE_ARN, AWS_ROLE_ARN, "s3.role_arn"): + if role_arn := get_property_with_tracking(S3_ROLE_ARN, AWS_ROLE_ARN, "s3.role_arn"): client_kwargs["role_arn"] = role_arn - if session_name := get(S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME, "s3.session_name"): + if session_name := get_property_with_tracking(S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME, "s3.session_name"): client_kwargs["session_name"] = session_name # get the rest of the properties with the `s3.` prefix that are not already evaluated @@ -513,39 +513,41 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem: properties = filter_properties(self.properties, key_predicate=lambda k: k.startswith(("s3.", "client."))) used_keys: set[str] = set() - def get(*keys: str) -> str | None: + def get_property_with_tracking(*keys: str) -> str | None: return get_first_property_value_with_tracking(properties, used_keys, *keys) client_kwargs: Properties = {} # Handle S3 region configuration with optional auto-resolution client_kwargs["region"] = self._resolve_s3_region( - provided_region=get(S3_REGION, AWS_REGION), resolve_region_override=get(S3_RESOLVE_REGION), bucket=netloc + provided_region=get_property_with_tracking(S3_REGION, AWS_REGION), + resolve_region_override=get_property_with_tracking(S3_RESOLVE_REGION), + bucket=netloc, ) - if endpoint := get(S3_ENDPOINT, "s3.endpoint_override"): + if endpoint := get_property_with_tracking(S3_ENDPOINT, "s3.endpoint_override"): client_kwargs["endpoint_override"] = endpoint - if access_key := get(S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID, "s3.access_key"): + if access_key := get_property_with_tracking(S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID, "s3.access_key"): client_kwargs["access_key"] = access_key - if secret_key := get(S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY, "s3.secret_key"): + if secret_key := get_property_with_tracking(S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY, "s3.secret_key"): client_kwargs["secret_key"] = secret_key - if session_token := get(S3_SESSION_TOKEN, AWS_SESSION_TOKEN, "s3.session_token"): + if session_token := get_property_with_tracking(S3_SESSION_TOKEN, AWS_SESSION_TOKEN, "s3.session_token"): client_kwargs["session_token"] = session_token - if proxy_uri := get(S3_PROXY_URI, "s3.proxy_options"): + if proxy_uri := get_property_with_tracking(S3_PROXY_URI, "s3.proxy_options"): client_kwargs["proxy_options"] = proxy_uri - if connect_timeout := get(S3_CONNECT_TIMEOUT, "s3.connect_timeout"): + if connect_timeout := get_property_with_tracking(S3_CONNECT_TIMEOUT, "s3.connect_timeout"): client_kwargs["connect_timeout"] = float(connect_timeout) - if request_timeout := get(S3_REQUEST_TIMEOUT, "s3.request_timeout"): + if request_timeout := get_property_with_tracking(S3_REQUEST_TIMEOUT, "s3.request_timeout"): client_kwargs["request_timeout"] = float(request_timeout) - if role_arn := get(S3_ROLE_ARN, AWS_ROLE_ARN, "s3.role_arn"): + if role_arn := get_property_with_tracking(S3_ROLE_ARN, AWS_ROLE_ARN, "s3.role_arn"): client_kwargs["role_arn"] = role_arn - if session_name := get(S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME, "s3.session_name"): + if session_name := get_property_with_tracking(S3_ROLE_SESSION_NAME, AWS_ROLE_SESSION_NAME, "s3.session_name"): client_kwargs["session_name"] = session_name - if force_virtual_addressing := get(S3_FORCE_VIRTUAL_ADDRESSING, "s3.force_virtual_addressing"): + if force_virtual_addressing := get_property_with_tracking(S3_FORCE_VIRTUAL_ADDRESSING, "s3.force_virtual_addressing"): client_kwargs["force_virtual_addressing"] = convert_str_to_bool(force_virtual_addressing) # Handle retry strategy special case - if retry_strategy_impl := get(S3_RETRY_STRATEGY_IMPL, "s3.retry_strategy"): + if retry_strategy_impl := get_property_with_tracking(S3_RETRY_STRATEGY_IMPL, "s3.retry_strategy"): if retry_instance := _import_retry_strategy(retry_strategy_impl): client_kwargs["retry_strategy"] = retry_instance @@ -569,30 +571,30 @@ def _initialize_azure_fs(self) -> FileSystem: properties = filter_properties(self.properties, key_predicate=lambda k: k.startswith("adls.")) used_keys: set[str] = set() - def get(*keys: str) -> str | None: + def get_property_with_tracking(*keys: str) -> str | None: return get_first_property_value_with_tracking(properties, used_keys, *keys) client_kwargs: Properties = {} - if account_name := get(ADLS_ACCOUNT_NAME, "adls.account_name"): + if account_name := get_property_with_tracking(ADLS_ACCOUNT_NAME, "adls.account_name"): client_kwargs["account_name"] = account_name - if account_key := get(ADLS_ACCOUNT_KEY, "adls.account_key"): + if account_key := get_property_with_tracking(ADLS_ACCOUNT_KEY, "adls.account_key"): client_kwargs["account_key"] = account_key - if blob_storage_authority := get(ADLS_BLOB_STORAGE_AUTHORITY, "adls.blob_storage_authority"): + if blob_storage_authority := get_property_with_tracking(ADLS_BLOB_STORAGE_AUTHORITY, "adls.blob_storage_authority"): client_kwargs["blob_storage_authority"] = blob_storage_authority - if dfs_storage_authority := get(ADLS_DFS_STORAGE_AUTHORITY, "adls.dfs_storage_authority"): + if dfs_storage_authority := get_property_with_tracking(ADLS_DFS_STORAGE_AUTHORITY, "adls.dfs_storage_authority"): client_kwargs["dfs_storage_authority"] = dfs_storage_authority - if blob_storage_scheme := get(ADLS_BLOB_STORAGE_SCHEME, "adls.blob_storage_scheme"): + if blob_storage_scheme := get_property_with_tracking(ADLS_BLOB_STORAGE_SCHEME, "adls.blob_storage_scheme"): client_kwargs["blob_storage_scheme"] = blob_storage_scheme - if dfs_storage_scheme := get(ADLS_DFS_STORAGE_SCHEME, "adls.dfs_storage_scheme"): + if dfs_storage_scheme := get_property_with_tracking(ADLS_DFS_STORAGE_SCHEME, "adls.dfs_storage_scheme"): client_kwargs["dfs_storage_scheme"] = dfs_storage_scheme - if sas_token := get(ADLS_SAS_TOKEN, "adls.sas_token"): + if sas_token := get_property_with_tracking(ADLS_SAS_TOKEN, "adls.sas_token"): client_kwargs["sas_token"] = sas_token # get the rest of the properties with the `adls.` prefix that are not already evaluated @@ -609,19 +611,19 @@ def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem: properties = filter_properties(self.properties, key_predicate=lambda k: k.startswith("hdfs.")) used_keys: set[str] = set() - def get(*keys: str) -> str | None: + def get_property_with_tracking(*keys: str) -> str | None: return get_first_property_value_with_tracking(properties, used_keys, *keys) client_kwargs: Properties = {} - if host := get(HDFS_HOST): + if host := get_property_with_tracking(HDFS_HOST): client_kwargs["host"] = host - if port := get(HDFS_PORT): + if port := get_property_with_tracking(HDFS_PORT): # port should be an integer type client_kwargs["port"] = int(port) - if user := get(HDFS_USER): + if user := get_property_with_tracking(HDFS_USER): client_kwargs["user"] = user - if kerb_ticket := get(HDFS_KERB_TICKET, "hdfs.kerb_ticket"): + if kerb_ticket := get_property_with_tracking(HDFS_KERB_TICKET, "hdfs.kerb_ticket"): client_kwargs["kerb_ticket"] = kerb_ticket # get the rest of the properties with the `hdfs.` prefix that are not already evaluated @@ -635,29 +637,31 @@ def _initialize_gcs_fs(self) -> FileSystem: properties = filter_properties(self.properties, key_predicate=lambda k: k.startswith("gcs.")) used_keys: set[str] = set() - def get(*keys: str) -> str | None: + def get_property_with_tracking(*keys: str) -> str | None: return get_first_property_value_with_tracking(properties, used_keys, *keys) client_kwargs: Properties = {} - if access_token := get(GCS_TOKEN, "gcs.access_token"): + if access_token := get_property_with_tracking(GCS_TOKEN, "gcs.access_token"): client_kwargs["access_token"] = access_token - if expiration := get(GCS_TOKEN_EXPIRES_AT_MS, "gcs.credential_token_expiration"): + if expiration := get_property_with_tracking(GCS_TOKEN_EXPIRES_AT_MS, "gcs.credential_token_expiration"): client_kwargs["credential_token_expiration"] = millis_to_datetime(int(expiration)) - if bucket_location := get(GCS_DEFAULT_LOCATION, "gcs.default_bucket_location"): + if bucket_location := get_property_with_tracking(GCS_DEFAULT_LOCATION, "gcs.default_bucket_location"): client_kwargs["default_bucket_location"] = bucket_location - if endpoint := get(GCS_SERVICE_HOST): + if endpoint := get_property_with_tracking(GCS_SERVICE_HOST): url_parts = urlparse(endpoint) client_kwargs["scheme"] = url_parts.scheme client_kwargs["endpoint_override"] = url_parts.netloc - if (scheme := get("gcs.scheme")) and "scheme" not in client_kwargs: # GCS_SERVICE_HOST takes precedence + if ( + scheme := get_property_with_tracking("gcs.scheme") + ) and "scheme" not in client_kwargs: # GCS_SERVICE_HOST takes precedence client_kwargs["scheme"] = scheme if ( - endpoint_override := get("gcs.endpoint_override") + endpoint_override := get_property_with_tracking("gcs.endpoint_override") ) and "endpoint_override" not in client_kwargs: # GCS_SERVICE_HOST takes precedence client_kwargs["endpoint_override"] = endpoint_override - if project_id := get(GCS_PROJECT_ID, "gcs.project_id"): + if project_id := get_property_with_tracking(GCS_PROJECT_ID, "gcs.project_id"): client_kwargs["project_id"] = project_id # get the rest of the properties with the `gcs.` prefix that are not already evaluated