From b40849e3cd069a1fc46bbc15941360a302d294d4 Mon Sep 17 00:00:00 2001 From: Tom McCormick Date: Tue, 5 Aug 2025 21:54:50 -0400 Subject: [PATCH 1/5] fix file system with env variables to set scheme and net loc if not specified in file path --- pyiceberg/io/pyarrow.py | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 2797371028..62383a37a2 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -388,14 +388,28 @@ def __init__(self, properties: Properties = EMPTY_DICT): @staticmethod def parse_location(location: str) -> Tuple[str, str, str]: - """Return the path without the scheme.""" + """Return (scheme, netloc, path) for the given location. + Uses environment variables DEFAULT_SCHEME and DEFAULT_NETLOC + if scheme/netloc are missing. + """ uri = urlparse(location) - if not uri.scheme: - return "file", uri.netloc, os.path.abspath(location) - elif uri.scheme in ("hdfs", "viewfs"): - return uri.scheme, uri.netloc, uri.path + + # Load defaults from environment + default_scheme = os.getenv("DEFAULT_SCHEME", "file") + default_netloc = os.getenv("DEFAULT_NETLOC", "") + + # Apply logic + scheme = uri.scheme or default_scheme + netloc = uri.netloc or default_netloc + + if scheme in ("hdfs", "viewfs"): + return scheme, netloc, uri.path else: - return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}" + # For non-HDFS URIs, include netloc in the path if present + path = uri.path if uri.scheme else os.path.abspath(location) + if netloc and not path.startswith(netloc): + path = f"{netloc}{path}" + return scheme, netloc, path def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem: """Initialize FileSystem for different scheme.""" From 1b6f981a312d00f68cc7f67f160f57a57526add6 Mon Sep 17 00:00:00 2001 From: Tom McCormick Date: Wed, 6 Aug 2025 12:05:25 -0400 Subject: [PATCH 2/5] add test --- tests/io/test_pyarrow.py | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 4f121ba3bc..d3d6f07b2c 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -2638,3 +2638,39 @@ def test_retry_strategy_not_found() -> None: io = PyArrowFileIO(properties={S3_RETRY_STRATEGY_IMPL: "pyiceberg.DoesNotExist"}) with pytest.warns(UserWarning, match="Could not initialize S3 retry strategy: pyiceberg.DoesNotExist"): io.new_input("s3://bucket/path/to/file") + +def test_parse_location_environment_defaults(): + """Test that parse_location uses environment variables for defaults.""" + from pyiceberg.io.pyarrow import PyArrowFileIO + import os + + # Test with default environment (no env vars set) + scheme, netloc, path = PyArrowFileIO.parse_location("/foo/bar") + assert scheme == "file" + assert netloc == "" + assert path == "/foo/bar" + + try: + # Test with environment variables set + os.environ["DEFAULT_SCHEME"] = "scheme" + os.environ["DEFAULT_NETLOC"] = "netloc:8000" + + scheme, netloc, path = PyArrowFileIO.parse_location("/foo/bar") + assert scheme == "scheme" + assert netloc == "netloc:8000" + assert path == "netloc:8000/foo/bar" + + # Set environment variables + os.environ["DEFAULT_SCHEME"] = "hdfs" + os.environ["DEFAULT_NETLOC"] = "netloc:8000" + + scheme, netloc, path = PyArrowFileIO.parse_location("/foo/bar") + assert scheme == "hdfs" + assert netloc == "netloc:8000" + assert path == "/foo/bar" + finally: + # Clean up environment variables + if "DEFAULT_SCHEME" in os.environ: + del os.environ["DEFAULT_SCHEME"] + if "DEFAULT_NETLOC" in os.environ: + del os.environ["DEFAULT_NETLOC"] \ No newline at end of file From ae22e64aa0f7b50c70d37272f60b147eb55ba4a8 Mon Sep 17 00:00:00 2001 From: Tom McCormick Date: Wed, 6 Aug 2025 12:58:08 -0400 Subject: [PATCH 3/5] fix linting --- pyiceberg/io/pyarrow.py | 1 + tests/io/test_pyarrow.py | 14 ++++++++------ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 62383a37a2..8a23e00e9b 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -389,6 +389,7 @@ def __init__(self, properties: Properties = EMPTY_DICT): @staticmethod def parse_location(location: str) -> Tuple[str, str, str]: """Return (scheme, netloc, path) for the given location. + Uses environment variables DEFAULT_SCHEME and DEFAULT_NETLOC if scheme/netloc are missing. """ diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index d3d6f07b2c..140bac5316 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -2639,22 +2639,24 @@ def test_retry_strategy_not_found() -> None: with pytest.warns(UserWarning, match="Could not initialize S3 retry strategy: pyiceberg.DoesNotExist"): io.new_input("s3://bucket/path/to/file") -def test_parse_location_environment_defaults(): + +def test_parse_location_environment_defaults() -> None: """Test that parse_location uses environment variables for defaults.""" - from pyiceberg.io.pyarrow import PyArrowFileIO import os - + + from pyiceberg.io.pyarrow import PyArrowFileIO + # Test with default environment (no env vars set) scheme, netloc, path = PyArrowFileIO.parse_location("/foo/bar") assert scheme == "file" assert netloc == "" assert path == "/foo/bar" - + try: # Test with environment variables set os.environ["DEFAULT_SCHEME"] = "scheme" os.environ["DEFAULT_NETLOC"] = "netloc:8000" - + scheme, netloc, path = PyArrowFileIO.parse_location("/foo/bar") assert scheme == "scheme" assert netloc == "netloc:8000" @@ -2673,4 +2675,4 @@ def test_parse_location_environment_defaults(): if "DEFAULT_SCHEME" in os.environ: del os.environ["DEFAULT_SCHEME"] if "DEFAULT_NETLOC" in os.environ: - del os.environ["DEFAULT_NETLOC"] \ No newline at end of file + del os.environ["DEFAULT_NETLOC"] From f4d16f28a493bffa15b23c4e59985313e728b846 Mon Sep 17 00:00:00 2001 From: Tom McCormick Date: Thu, 7 Aug 2025 11:03:30 -0400 Subject: [PATCH 4/5] add get_str config support and get val from config --- pyiceberg/io/pyarrow.py | 14 ++++++++------ pyiceberg/utils/config.py | 5 +++++ tests/io/test_pyarrow.py | 25 +++++++++++++------------ 3 files changed, 26 insertions(+), 18 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 8a23e00e9b..2a0ea4e0ec 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -381,13 +381,15 @@ def to_input_file(self) -> PyArrowFile: class PyArrowFileIO(FileIO): fs_by_scheme: Callable[[str, Optional[str]], FileSystem] + config: Config def __init__(self, properties: Properties = EMPTY_DICT): self.fs_by_scheme: Callable[[str, Optional[str]], FileSystem] = lru_cache(self._initialize_fs) + self.config = Config() super().__init__(properties=properties) @staticmethod - def parse_location(location: str) -> Tuple[str, str, str]: + def parse_location(location: str, config: Config) -> Tuple[str, str, str]: """Return (scheme, netloc, path) for the given location. Uses environment variables DEFAULT_SCHEME and DEFAULT_NETLOC @@ -396,8 +398,8 @@ def parse_location(location: str) -> Tuple[str, str, str]: uri = urlparse(location) # Load defaults from environment - default_scheme = os.getenv("DEFAULT_SCHEME", "file") - default_netloc = os.getenv("DEFAULT_NETLOC", "") + default_scheme = config.get_str("default-scheme") or "file" + default_netloc = config.get_str("default-netloc") or "" # Apply logic scheme = uri.scheme or default_scheme @@ -599,7 +601,7 @@ def new_input(self, location: str) -> PyArrowFile: Returns: PyArrowFile: A PyArrowFile instance for the given location. """ - scheme, netloc, path = self.parse_location(location) + scheme, netloc, path = self.parse_location(location, self.config) return PyArrowFile( fs=self.fs_by_scheme(scheme, netloc), location=location, @@ -616,7 +618,7 @@ def new_output(self, location: str) -> PyArrowFile: Returns: PyArrowFile: A PyArrowFile instance for the given location. """ - scheme, netloc, path = self.parse_location(location) + scheme, netloc, path = self.parse_location(location, self.config) return PyArrowFile( fs=self.fs_by_scheme(scheme, netloc), location=location, @@ -637,7 +639,7 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None: an AWS error code 15. """ str_location = location.location if isinstance(location, (InputFile, OutputFile)) else location - scheme, netloc, path = self.parse_location(str_location) + scheme, netloc, path = self.parse_location(str_location, self.config) fs = self.fs_by_scheme(scheme, netloc) try: diff --git a/pyiceberg/utils/config.py b/pyiceberg/utils/config.py index 78f121a402..732325db0e 100644 --- a/pyiceberg/utils/config.py +++ b/pyiceberg/utils/config.py @@ -180,3 +180,8 @@ def get_bool(self, key: str) -> Optional[bool]: except ValueError as err: raise ValueError(f"{key} should be a boolean or left unset. Current value: {val}") from err return None + + def get_str(self, key: str) -> Optional[str]: + if (val := self.config.get(key)) is not None: + return val + return None diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 140bac5316..ebb2fd342a 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -106,6 +106,7 @@ TimestamptzType, TimeType, ) +from pyiceberg.utils.config import Config from tests.catalog.test_base import InMemoryCatalog from tests.conftest import UNIFIED_AWS_SESSION_PROPERTIES @@ -2024,7 +2025,7 @@ def test_writing_avro_file_adls(generated_manifest_entry_file: str, pyarrow_file def test_parse_location() -> None: def check_results(location: str, expected_schema: str, expected_netloc: str, expected_uri: str) -> None: - schema, netloc, uri = PyArrowFileIO.parse_location(location) + schema, netloc, uri = PyArrowFileIO.parse_location(location, Config()) assert schema == expected_schema assert netloc == expected_netloc assert uri == expected_uri @@ -2647,32 +2648,32 @@ def test_parse_location_environment_defaults() -> None: from pyiceberg.io.pyarrow import PyArrowFileIO # Test with default environment (no env vars set) - scheme, netloc, path = PyArrowFileIO.parse_location("/foo/bar") + scheme, netloc, path = PyArrowFileIO.parse_location("/foo/bar", Config()) assert scheme == "file" assert netloc == "" assert path == "/foo/bar" try: # Test with environment variables set - os.environ["DEFAULT_SCHEME"] = "scheme" - os.environ["DEFAULT_NETLOC"] = "netloc:8000" + os.environ["PYICEBERG_DEFAULT_SCHEME"] = "scheme" + os.environ["PYICEBERG_DEFAULT_NETLOC"] = "netloc:8000" - scheme, netloc, path = PyArrowFileIO.parse_location("/foo/bar") + scheme, netloc, path = PyArrowFileIO.parse_location("/foo/bar", Config()) assert scheme == "scheme" assert netloc == "netloc:8000" assert path == "netloc:8000/foo/bar" # Set environment variables - os.environ["DEFAULT_SCHEME"] = "hdfs" - os.environ["DEFAULT_NETLOC"] = "netloc:8000" + os.environ["PYICEBERG_DEFAULT_SCHEME"] = "hdfs" + os.environ["PYICEBERG_DEFAULT_NETLOC"] = "netloc:8000" - scheme, netloc, path = PyArrowFileIO.parse_location("/foo/bar") + scheme, netloc, path = PyArrowFileIO.parse_location("/foo/bar", Config()) assert scheme == "hdfs" assert netloc == "netloc:8000" assert path == "/foo/bar" finally: # Clean up environment variables - if "DEFAULT_SCHEME" in os.environ: - del os.environ["DEFAULT_SCHEME"] - if "DEFAULT_NETLOC" in os.environ: - del os.environ["DEFAULT_NETLOC"] + if "PYICEBERG_DEFAULT_SCHEME" in os.environ: + del os.environ["PYICEBERG_DEFAULT_SCHEME"] + if "PYICEBERG_DEFAULT_NETLOC" in os.environ: + del os.environ["PYICEBERG_DEFAULT_NETLOC"] From 48f201ac5e359f3c30cb828dc3ec3ddebb71c9c3 Mon Sep 17 00:00:00 2001 From: Tom McCormick Date: Tue, 12 Aug 2025 17:23:31 -0400 Subject: [PATCH 5/5] fix linter --- pyiceberg/utils/config.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pyiceberg/utils/config.py b/pyiceberg/utils/config.py index 732325db0e..f28e875dcc 100644 --- a/pyiceberg/utils/config.py +++ b/pyiceberg/utils/config.py @@ -183,5 +183,8 @@ def get_bool(self, key: str) -> Optional[bool]: def get_str(self, key: str) -> Optional[str]: if (val := self.config.get(key)) is not None: - return val + if isinstance(val, str): + return val + else: + raise ValueError(f"{key} should be a string or left unset. Current value: {val}") return None