Skip to content

Fix filesystem #2291

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 27 additions & 10 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,21 +381,38 @@ def to_input_file(self) -> PyArrowFile:

class PyArrowFileIO(FileIO):
fs_by_scheme: Callable[[str, Optional[str]], FileSystem]
config: Config

def __init__(self, properties: Properties = EMPTY_DICT):
self.fs_by_scheme: Callable[[str, Optional[str]], FileSystem] = lru_cache(self._initialize_fs)
self.config = Config()
super().__init__(properties=properties)

@staticmethod
def parse_location(location: str) -> Tuple[str, str, str]:
"""Return the path without the scheme."""
def parse_location(location: str, config: Config) -> Tuple[str, str, str]:
"""Return (scheme, netloc, path) for the given location.

Uses environment variables DEFAULT_SCHEME and DEFAULT_NETLOC
if scheme/netloc are missing.
"""
uri = urlparse(location)
if not uri.scheme:
return "file", uri.netloc, os.path.abspath(location)
elif uri.scheme in ("hdfs", "viewfs"):
return uri.scheme, uri.netloc, uri.path

# Load defaults from environment
default_scheme = config.get_str("default-scheme") or "file"
default_netloc = config.get_str("default-netloc") or ""

Comment on lines +401 to +403
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i think its better to pass these in through the properties field
https://py.iceberg.apache.org/configuration/#hdfs

we can get the env variable and then pass into the properties.

# Apply logic
scheme = uri.scheme or default_scheme
netloc = uri.netloc or default_netloc

if scheme in ("hdfs", "viewfs"):
return scheme, netloc, uri.path
else:
return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"
# For non-HDFS URIs, include netloc in the path if present
path = uri.path if uri.scheme else os.path.abspath(location)
if netloc and not path.startswith(netloc):
path = f"{netloc}{path}"
return scheme, netloc, path
Comment on lines +404 to +415
Copy link
Contributor

@kevinjqliu kevinjqliu Aug 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i actually really want to get rid of this if {scheme} logic here.

Is there a way to refactor these changes down to the _initialize_hdfs_fs? so we can keep the hdfs logic in the same place?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't see a nice way to do this since the path used in the pyarrowfile is actually different in the different cases, i tried to see if we could use the same path with netloc in it for hdfs but it doesn't seem to work
#2291 (comment)


def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem:
"""Initialize FileSystem for different scheme."""
Expand Down Expand Up @@ -584,7 +601,7 @@ def new_input(self, location: str) -> PyArrowFile:
Returns:
PyArrowFile: A PyArrowFile instance for the given location.
"""
scheme, netloc, path = self.parse_location(location)
scheme, netloc, path = self.parse_location(location, self.config)
return PyArrowFile(
fs=self.fs_by_scheme(scheme, netloc),
location=location,
Expand All @@ -601,7 +618,7 @@ def new_output(self, location: str) -> PyArrowFile:
Returns:
PyArrowFile: A PyArrowFile instance for the given location.
"""
scheme, netloc, path = self.parse_location(location)
scheme, netloc, path = self.parse_location(location, self.config)
return PyArrowFile(
fs=self.fs_by_scheme(scheme, netloc),
location=location,
Expand All @@ -622,7 +639,7 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
an AWS error code 15.
"""
str_location = location.location if isinstance(location, (InputFile, OutputFile)) else location
scheme, netloc, path = self.parse_location(str_location)
scheme, netloc, path = self.parse_location(str_location, self.config)
fs = self.fs_by_scheme(scheme, netloc)

try:
Expand Down
8 changes: 8 additions & 0 deletions pyiceberg/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,11 @@ def get_bool(self, key: str) -> Optional[bool]:
except ValueError as err:
raise ValueError(f"{key} should be a boolean or left unset. Current value: {val}") from err
return None

def get_str(self, key: str) -> Optional[str]:
if (val := self.config.get(key)) is not None:
if isinstance(val, str):
return val
else:
raise ValueError(f"{key} should be a string or left unset. Current value: {val}")
return None
41 changes: 40 additions & 1 deletion tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
TimestamptzType,
TimeType,
)
from pyiceberg.utils.config import Config
from tests.catalog.test_base import InMemoryCatalog
from tests.conftest import UNIFIED_AWS_SESSION_PROPERTIES

Expand Down Expand Up @@ -2024,7 +2025,7 @@ def test_writing_avro_file_adls(generated_manifest_entry_file: str, pyarrow_file

def test_parse_location() -> None:
def check_results(location: str, expected_schema: str, expected_netloc: str, expected_uri: str) -> None:
schema, netloc, uri = PyArrowFileIO.parse_location(location)
schema, netloc, uri = PyArrowFileIO.parse_location(location, Config())
assert schema == expected_schema
assert netloc == expected_netloc
assert uri == expected_uri
Expand Down Expand Up @@ -2638,3 +2639,41 @@ def test_retry_strategy_not_found() -> None:
io = PyArrowFileIO(properties={S3_RETRY_STRATEGY_IMPL: "pyiceberg.DoesNotExist"})
with pytest.warns(UserWarning, match="Could not initialize S3 retry strategy: pyiceberg.DoesNotExist"):
io.new_input("s3://bucket/path/to/file")


def test_parse_location_environment_defaults() -> None:
"""Test that parse_location uses environment variables for defaults."""
import os

from pyiceberg.io.pyarrow import PyArrowFileIO

# Test with default environment (no env vars set)
scheme, netloc, path = PyArrowFileIO.parse_location("/foo/bar", Config())
assert scheme == "file"
assert netloc == ""
assert path == "/foo/bar"

try:
# Test with environment variables set
os.environ["PYICEBERG_DEFAULT_SCHEME"] = "scheme"
os.environ["PYICEBERG_DEFAULT_NETLOC"] = "netloc:8000"

scheme, netloc, path = PyArrowFileIO.parse_location("/foo/bar", Config())
assert scheme == "scheme"
assert netloc == "netloc:8000"
assert path == "netloc:8000/foo/bar"

# Set environment variables
os.environ["PYICEBERG_DEFAULT_SCHEME"] = "hdfs"
os.environ["PYICEBERG_DEFAULT_NETLOC"] = "netloc:8000"

scheme, netloc, path = PyArrowFileIO.parse_location("/foo/bar", Config())
assert scheme == "hdfs"
assert netloc == "netloc:8000"
assert path == "/foo/bar"
finally:
# Clean up environment variables
if "PYICEBERG_DEFAULT_SCHEME" in os.environ:
del os.environ["PYICEBERG_DEFAULT_SCHEME"]
if "PYICEBERG_DEFAULT_NETLOC" in os.environ:
del os.environ["PYICEBERG_DEFAULT_NETLOC"]