diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 2797371028..2a0ea4e0ec 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -381,21 +381,38 @@ def to_input_file(self) -> PyArrowFile: class PyArrowFileIO(FileIO): fs_by_scheme: Callable[[str, Optional[str]], FileSystem] + config: Config def __init__(self, properties: Properties = EMPTY_DICT): self.fs_by_scheme: Callable[[str, Optional[str]], FileSystem] = lru_cache(self._initialize_fs) + self.config = Config() super().__init__(properties=properties) @staticmethod - def parse_location(location: str) -> Tuple[str, str, str]: - """Return the path without the scheme.""" + def parse_location(location: str, config: Config) -> Tuple[str, str, str]: + """Return (scheme, netloc, path) for the given location. + + Uses environment variables DEFAULT_SCHEME and DEFAULT_NETLOC + if scheme/netloc are missing. + """ uri = urlparse(location) - if not uri.scheme: - return "file", uri.netloc, os.path.abspath(location) - elif uri.scheme in ("hdfs", "viewfs"): - return uri.scheme, uri.netloc, uri.path + + # Load defaults from environment + default_scheme = config.get_str("default-scheme") or "file" + default_netloc = config.get_str("default-netloc") or "" + + # Apply logic + scheme = uri.scheme or default_scheme + netloc = uri.netloc or default_netloc + + if scheme in ("hdfs", "viewfs"): + return scheme, netloc, uri.path else: - return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}" + # For non-HDFS URIs, include netloc in the path if present + path = uri.path if uri.scheme else os.path.abspath(location) + if netloc and not path.startswith(netloc): + path = f"{netloc}{path}" + return scheme, netloc, path def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem: """Initialize FileSystem for different scheme.""" @@ -584,7 +601,7 @@ def new_input(self, location: str) -> PyArrowFile: Returns: PyArrowFile: A PyArrowFile instance for the given location. """ - scheme, netloc, path = self.parse_location(location) + scheme, netloc, path = self.parse_location(location, self.config) return PyArrowFile( fs=self.fs_by_scheme(scheme, netloc), location=location, @@ -601,7 +618,7 @@ def new_output(self, location: str) -> PyArrowFile: Returns: PyArrowFile: A PyArrowFile instance for the given location. """ - scheme, netloc, path = self.parse_location(location) + scheme, netloc, path = self.parse_location(location, self.config) return PyArrowFile( fs=self.fs_by_scheme(scheme, netloc), location=location, @@ -622,7 +639,7 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None: an AWS error code 15. """ str_location = location.location if isinstance(location, (InputFile, OutputFile)) else location - scheme, netloc, path = self.parse_location(str_location) + scheme, netloc, path = self.parse_location(str_location, self.config) fs = self.fs_by_scheme(scheme, netloc) try: diff --git a/pyiceberg/utils/config.py b/pyiceberg/utils/config.py index 78f121a402..f28e875dcc 100644 --- a/pyiceberg/utils/config.py +++ b/pyiceberg/utils/config.py @@ -180,3 +180,11 @@ def get_bool(self, key: str) -> Optional[bool]: except ValueError as err: raise ValueError(f"{key} should be a boolean or left unset. Current value: {val}") from err return None + + def get_str(self, key: str) -> Optional[str]: + if (val := self.config.get(key)) is not None: + if isinstance(val, str): + return val + else: + raise ValueError(f"{key} should be a string or left unset. Current value: {val}") + return None diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 4f121ba3bc..ebb2fd342a 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -106,6 +106,7 @@ TimestamptzType, TimeType, ) +from pyiceberg.utils.config import Config from tests.catalog.test_base import InMemoryCatalog from tests.conftest import UNIFIED_AWS_SESSION_PROPERTIES @@ -2024,7 +2025,7 @@ def test_writing_avro_file_adls(generated_manifest_entry_file: str, pyarrow_file def test_parse_location() -> None: def check_results(location: str, expected_schema: str, expected_netloc: str, expected_uri: str) -> None: - schema, netloc, uri = PyArrowFileIO.parse_location(location) + schema, netloc, uri = PyArrowFileIO.parse_location(location, Config()) assert schema == expected_schema assert netloc == expected_netloc assert uri == expected_uri @@ -2638,3 +2639,41 @@ def test_retry_strategy_not_found() -> None: io = PyArrowFileIO(properties={S3_RETRY_STRATEGY_IMPL: "pyiceberg.DoesNotExist"}) with pytest.warns(UserWarning, match="Could not initialize S3 retry strategy: pyiceberg.DoesNotExist"): io.new_input("s3://bucket/path/to/file") + + +def test_parse_location_environment_defaults() -> None: + """Test that parse_location uses environment variables for defaults.""" + import os + + from pyiceberg.io.pyarrow import PyArrowFileIO + + # Test with default environment (no env vars set) + scheme, netloc, path = PyArrowFileIO.parse_location("/foo/bar", Config()) + assert scheme == "file" + assert netloc == "" + assert path == "/foo/bar" + + try: + # Test with environment variables set + os.environ["PYICEBERG_DEFAULT_SCHEME"] = "scheme" + os.environ["PYICEBERG_DEFAULT_NETLOC"] = "netloc:8000" + + scheme, netloc, path = PyArrowFileIO.parse_location("/foo/bar", Config()) + assert scheme == "scheme" + assert netloc == "netloc:8000" + assert path == "netloc:8000/foo/bar" + + # Set environment variables + os.environ["PYICEBERG_DEFAULT_SCHEME"] = "hdfs" + os.environ["PYICEBERG_DEFAULT_NETLOC"] = "netloc:8000" + + scheme, netloc, path = PyArrowFileIO.parse_location("/foo/bar", Config()) + assert scheme == "hdfs" + assert netloc == "netloc:8000" + assert path == "/foo/bar" + finally: + # Clean up environment variables + if "PYICEBERG_DEFAULT_SCHEME" in os.environ: + del os.environ["PYICEBERG_DEFAULT_SCHEME"] + if "PYICEBERG_DEFAULT_NETLOC" in os.environ: + del os.environ["PYICEBERG_DEFAULT_NETLOC"]