diff --git a/.python-version b/.python-version new file mode 100644 index 0000000000..4eba2a62eb --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.13.0 diff --git a/dev/provision.py b/dev/provision.py index 231f5123ce..a91d31849b 100644 --- a/dev/provision.py +++ b/dev/provision.py @@ -409,3 +409,28 @@ ) spark.sql(f"ALTER TABLE {catalog_name}.default.test_empty_scan_ordered_str WRITE ORDERED BY id") spark.sql(f"INSERT INTO {catalog_name}.default.test_empty_scan_ordered_str VALUES 'a', 'c'") + + spark.sql( + f""" + CREATE TABLE {catalog_name}.default.test_read_orc ( + dt date, + ts timestamp, + number integer, + letter string + ) + USING iceberg + TBLPROPERTIES ( + 'format-version'='2', + 'write.format.default'='orc' + ); + """ + ) + + spark.sql(f"""INSERT INTO {catalog_name}.default.test_read_orc + VALUES + (CAST('2022-03-01' AS date), CAST('2022-03-01 01:22:00' AS timestamp), 1, 'a'), + (CAST('2022-03-02' AS date), CAST('2022-03-02 02:22:00' AS timestamp), 2, 'b'), + (CAST('2022-03-03' AS date), CAST('2022-03-02 02:22:00' AS timestamp), 3, 'c') + """) + + spark.sql(f"DELETE FROM {catalog_name}.default.test_read_orc WHERE number = 3") \ No newline at end of file diff --git a/poetry.lock b/poetry.lock index 4a4f95f0c5..2ceb67b8ca 100644 --- a/poetry.lock +++ b/poetry.lock @@ -361,15 +361,15 @@ requests = ">=2.20.0" [[package]] name = "azure-identity" -version = "1.23.0" +version = "1.23.1" description = "Microsoft Azure Identity Library for Python" optional = true python-versions = ">=3.9" groups = ["main"] markers = "extra == \"adlfs\"" files = [ - {file = "azure_identity-1.23.0-py3-none-any.whl", hash = "sha256:dbbeb64b8e5eaa81c44c565f264b519ff2de7ff0e02271c49f3cb492762a50b0"}, - {file = "azure_identity-1.23.0.tar.gz", hash = "sha256:d9cdcad39adb49d4bb2953a217f62aec1f65bbb3c63c9076da2be2a47e53dde4"}, + {file = "azure_identity-1.23.1-py3-none-any.whl", hash = "sha256:7eed28baa0097a47e3fb53bd35a63b769e6b085bb3cb616dfce2b67f28a004a1"}, + {file = "azure_identity-1.23.1.tar.gz", hash = "sha256:226c1ef982a9f8d5dcf6e0f9ed35eaef2a4d971e7dd86317e9b9d52e70a035e4"}, ] [package.dependencies] @@ -381,15 +381,15 @@ typing-extensions = ">=4.0.0" [[package]] name = "azure-storage-blob" -version = "12.25.1" +version = "12.26.0" description = "Microsoft Azure Blob Storage Client Library for Python" optional = true python-versions = ">=3.8" groups = ["main"] markers = "extra == \"adlfs\"" files = [ - {file = "azure_storage_blob-12.25.1-py3-none-any.whl", hash = "sha256:1f337aab12e918ec3f1b638baada97550673911c4ceed892acc8e4e891b74167"}, - {file = "azure_storage_blob-12.25.1.tar.gz", hash = "sha256:4f294ddc9bc47909ac66b8934bd26b50d2000278b10ad82cc109764fdc6e0e3b"}, + {file = "azure_storage_blob-12.26.0-py3-none-any.whl", hash = "sha256:8c5631b8b22b4f53ec5fff2f3bededf34cfef111e2af613ad42c9e6de00a77fe"}, + {file = "azure_storage_blob-12.26.0.tar.gz", hash = "sha256:5dd7d7824224f7de00bfeb032753601c982655173061e242f13be6e26d78d71f"}, ] [package.dependencies] @@ -1339,14 +1339,14 @@ tomli = {version = ">=2.0.1", markers = "python_full_version < \"3.11.0\""} [[package]] name = "distlib" -version = "0.3.9" +version = "0.4.0" description = "Distribution utilities" optional = false python-versions = "*" groups = ["dev"] files = [ - {file = "distlib-0.3.9-py2.py3-none-any.whl", hash = "sha256:47f8c22fd27c27e25a65601af709b38e4f0a45ea4fc2e710f65755fa8caaaf87"}, - {file = "distlib-0.3.9.tar.gz", hash = "sha256:a60f20dea646b8a33f3e7772f74dc0b2d0772d2837ee1342a00645c81edf9403"}, + {file = "distlib-0.4.0-py2.py3-none-any.whl", hash = "sha256:9659f7d87e46584a30b5780e43ac7a2143098441670ff0a49d5f9034c54a6c16"}, + {file = "distlib-0.4.0.tar.gz", hash = "sha256:feec40075be03a04501a973d81f633735b4b69f98b05450592310c0f401a4e0d"}, ] [[package]] @@ -1698,14 +1698,14 @@ files = [ [[package]] name = "fsspec" -version = "2025.5.1" +version = "2025.7.0" description = "File-system specification" optional = false python-versions = ">=3.9" groups = ["main"] files = [ - {file = "fsspec-2025.5.1-py3-none-any.whl", hash = "sha256:24d3a2e663d5fc735ab256263c4075f374a174c3410c0b25e5bd1970bceaa462"}, - {file = "fsspec-2025.5.1.tar.gz", hash = "sha256:2e55e47a540b91843b755e83ded97c6e897fa0942b11490113f09e9c443c2475"}, + {file = "fsspec-2025.7.0-py3-none-any.whl", hash = "sha256:8b012e39f63c7d5f10474de957f3ab793b47b45ae7d39f2fb735f8bbe25c0e21"}, + {file = "fsspec-2025.7.0.tar.gz", hash = "sha256:786120687ffa54b8283d942929540d8bc5ccfa820deb555a2b5d0ed2b737bf58"}, ] [package.extras] @@ -1713,7 +1713,7 @@ abfs = ["adlfs"] adl = ["adlfs"] arrow = ["pyarrow (>=1)"] dask = ["dask", "distributed"] -dev = ["pre-commit", "ruff"] +dev = ["pre-commit", "ruff (>=0.5)"] doc = ["numpydoc", "sphinx", "sphinx-design", "sphinx-rtd-theme", "yarl"] dropbox = ["dropbox", "dropboxdrivefs", "requests"] full = ["adlfs", "aiohttp (!=4.0.0a0,!=4.0.0a1)", "dask", "distributed", "dropbox", "dropboxdrivefs", "fusepy", "gcsfs", "libarchive-c", "ocifs", "panel", "paramiko", "pyarrow (>=1)", "pygit2", "requests", "s3fs", "smbprotocol", "tqdm"] @@ -1733,26 +1733,26 @@ smb = ["smbprotocol"] ssh = ["paramiko"] test = ["aiohttp (!=4.0.0a0,!=4.0.0a1)", "numpy", "pytest", "pytest-asyncio (!=0.22.0)", "pytest-benchmark", "pytest-cov", "pytest-mock", "pytest-recording", "pytest-rerunfailures", "requests"] test-downstream = ["aiobotocore (>=2.5.4,<3.0.0)", "dask[dataframe,test]", "moto[server] (>4,<5)", "pytest-timeout", "xarray"] -test-full = ["adlfs", "aiohttp (!=4.0.0a0,!=4.0.0a1)", "cloudpickle", "dask", "distributed", "dropbox", "dropboxdrivefs", "fastparquet", "fusepy", "gcsfs", "jinja2", "kerchunk", "libarchive-c", "lz4", "notebook", "numpy", "ocifs", "pandas", "panel", "paramiko", "pyarrow", "pyarrow (>=1)", "pyftpdlib", "pygit2", "pytest", "pytest-asyncio (!=0.22.0)", "pytest-benchmark", "pytest-cov", "pytest-mock", "pytest-recording", "pytest-rerunfailures", "python-snappy", "requests", "smbprotocol", "tqdm", "urllib3", "zarr", "zstandard"] +test-full = ["adlfs", "aiohttp (!=4.0.0a0,!=4.0.0a1)", "cloudpickle", "dask", "distributed", "dropbox", "dropboxdrivefs", "fastparquet", "fusepy", "gcsfs", "jinja2", "kerchunk", "libarchive-c", "lz4", "notebook", "numpy", "ocifs", "pandas", "panel", "paramiko", "pyarrow", "pyarrow (>=1)", "pyftpdlib", "pygit2", "pytest", "pytest-asyncio (!=0.22.0)", "pytest-benchmark", "pytest-cov", "pytest-mock", "pytest-recording", "pytest-rerunfailures", "python-snappy", "requests", "smbprotocol", "tqdm", "urllib3", "zarr", "zstandard ; python_version < \"3.14\""] tqdm = ["tqdm"] [[package]] name = "gcsfs" -version = "2025.5.1" +version = "2025.7.0" description = "Convenient Filesystem interface over GCS" optional = true python-versions = ">=3.9" groups = ["main"] markers = "extra == \"gcsfs\"" files = [ - {file = "gcsfs-2025.5.1-py2.py3-none-any.whl", hash = "sha256:48712471ff71ac83d3e2152ba4dc232874698466e344d5e700feba06b0a0de7b"}, - {file = "gcsfs-2025.5.1.tar.gz", hash = "sha256:ba945530cf4857cd9d599ccb3ae729c65c39088880b11c4df1fecac30df5f3e3"}, + {file = "gcsfs-2025.7.0-py2.py3-none-any.whl", hash = "sha256:653503331d58cb02bb34e725d4595d166e93f7f2f3ff88e4c66ef535ae66eae5"}, + {file = "gcsfs-2025.7.0.tar.gz", hash = "sha256:ad3ff66cf189ae8fc375ac8a2af409003dbca02357621cb94a66e457e02ba420"}, ] [package.dependencies] aiohttp = "<4.0.0a0 || >4.0.0a0,<4.0.0a1 || >4.0.0a1" decorator = ">4.1.2" -fsspec = "2025.5.1" +fsspec = "2025.7.0" google-auth = ">=1.2" google-auth-oauthlib = "*" google-cloud-storage = "*" @@ -2402,14 +2402,14 @@ files = [ [[package]] name = "jsonschema" -version = "4.24.0" +version = "4.25.0" description = "An implementation of JSON Schema validation for Python" optional = false python-versions = ">=3.9" groups = ["main", "dev"] files = [ - {file = "jsonschema-4.24.0-py3-none-any.whl", hash = "sha256:a462455f19f5faf404a7902952b6f0e3ce868f3ee09a359b05eca6673bd8412d"}, - {file = "jsonschema-4.24.0.tar.gz", hash = "sha256:0b4e8069eb12aedfa881333004bccaec24ecef5a8a6a4b6df142b2cc9599d196"}, + {file = "jsonschema-4.25.0-py3-none-any.whl", hash = "sha256:24c2e8da302de79c8b9382fee3e76b355e44d2a4364bb207159ce10b517bd716"}, + {file = "jsonschema-4.25.0.tar.gz", hash = "sha256:e63acf5c11762c0e6672ffb61482bdf57f0876684d8d249c0fe2d730d48bc55f"}, ] markers = {main = "extra == \"ray\""} @@ -2421,7 +2421,7 @@ rpds-py = ">=0.7.1" [package.extras] format = ["fqdn", "idna", "isoduration", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3987", "uri-template", "webcolors (>=1.11)"] -format-nongpl = ["fqdn", "idna", "isoduration", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3986-validator (>0.1.0)", "uri-template", "webcolors (>=24.6.0)"] +format-nongpl = ["fqdn", "idna", "isoduration", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3986-validator (>0.1.0)", "rfc3987-syntax (>=1.1.0)", "uri-template", "webcolors (>=24.6.0)"] [[package]] name = "jsonschema-path" @@ -5384,21 +5384,21 @@ pyasn1 = ">=0.1.3" [[package]] name = "s3fs" -version = "2025.5.1" +version = "2025.7.0" description = "Convenient Filesystem interface over S3" optional = true python-versions = ">=3.9" groups = ["main"] markers = "extra == \"s3fs\"" files = [ - {file = "s3fs-2025.5.1-py3-none-any.whl", hash = "sha256:7475e7c40a3a112f17144907ffae50782ab6c03487fe0b45a9c3942bb7a5c606"}, - {file = "s3fs-2025.5.1.tar.gz", hash = "sha256:84beffa231b8ed94f8d667e93387b38351e1c4447aedea5c2c19dd88b7fcb658"}, + {file = "s3fs-2025.7.0-py3-none-any.whl", hash = "sha256:b6b2d3f84b6aa1c2ba5e62e39dd9410cf54f10a2cce1ea6db1ba0d1a6bcce685"}, + {file = "s3fs-2025.7.0.tar.gz", hash = "sha256:5e7f9ec0cad7745155e3eb86fae15b1481fa29946bf5b3a4ce3a60701ce6022d"}, ] [package.dependencies] aiobotocore = ">=2.5.4,<3.0.0" aiohttp = "<4.0.0a0 || >4.0.0a0,<4.0.0a1 || >4.0.0a1" -fsspec = "2025.5.1" +fsspec = "2025.7.0" [package.extras] awscli = ["aiobotocore[awscli] (>=2.5.4,<3.0.0)"] diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index cee2ccac72..04b0d24b42 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -63,6 +63,7 @@ import pyarrow.dataset as ds import pyarrow.lib import pyarrow.parquet as pq +import pyarrow.orc as orc from pyarrow import ChunkedArray from pyarrow._s3fs import S3RetryStrategy from pyarrow.fs import ( @@ -195,6 +196,8 @@ ICEBERG_SCHEMA = b"iceberg.schema" # The PARQUET: in front means that it is Parquet specific, in this case the field_id PYARROW_PARQUET_FIELD_ID_KEY = b"PARQUET:field_id" +# ORC stores IDs as string metadata +ORC_FIELD_ID_KEY = b"iceberg.id" PYARROW_FIELD_DOC_KEY = b"doc" LIST_ELEMENT_NAME = "element" MAP_KEY_NAME = "key" @@ -387,14 +390,28 @@ def __init__(self, properties: Properties = EMPTY_DICT): @staticmethod def parse_location(location: str) -> Tuple[str, str, str]: - """Return the path without the scheme.""" + """Return (scheme, netloc, path) for the given location. + Uses environment variables DEFAULT_SCHEME and DEFAULT_NETLOC + if scheme/netloc are missing. + """ uri = urlparse(location) - if not uri.scheme: - return "file", uri.netloc, os.path.abspath(location) - elif uri.scheme in ("hdfs", "viewfs"): - return uri.scheme, uri.netloc, uri.path + + # Load defaults from environment + default_scheme = os.getenv("DEFAULT_SCHEME", "file") + default_netloc = os.getenv("DEFAULT_NETLOC", "") + + # Apply logic + scheme = uri.scheme or default_scheme + netloc = uri.netloc or default_netloc + + if scheme in ("hdfs", "viewfs"): + return scheme, netloc, uri.path else: - return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}" + # For non-HDFS URIs, include netloc in the path if present + path = uri.path if uri.scheme else os.path.abspath(location) + if netloc and not path.startswith(netloc): + path = f"{netloc}{path}" + return scheme, netloc, path def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem: """Initialize FileSystem for different scheme.""" @@ -574,7 +591,7 @@ def _initialize_gcs_fs(self) -> FileSystem: def _initialize_local_fs(self) -> FileSystem: return PyArrowLocalFileSystem() - def new_input(self, location: str) -> PyArrowFile: + def new_input(self, location: str, fs: Optional[FileIO] = None) -> PyArrowFile: """Get a PyArrowFile instance to read bytes from the file at the given location. Args: @@ -584,8 +601,11 @@ def new_input(self, location: str) -> PyArrowFile: PyArrowFile: A PyArrowFile instance for the given location. """ scheme, netloc, path = self.parse_location(location) + logger.warning(f"Scheme: {scheme}, Netloc: {netloc}, Path: {path}") + if not fs: + fs = self.fs_by_scheme(scheme, netloc) return PyArrowFile( - fs=self.fs_by_scheme(scheme, netloc), + fs=fs, location=location, path=path, buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)), @@ -973,6 +993,8 @@ def _expression_to_complementary_pyarrow(expr: BooleanExpression) -> pc.Expressi def _get_file_format(file_format: FileFormat, **kwargs: Dict[str, Any]) -> ds.FileFormat: if file_format == FileFormat.PARQUET: return ds.ParquetFileFormat(**kwargs) + elif file_format == FileFormat.ORC: + return ds.OrcFileFormat(**kwargs) else: raise ValueError(f"Unsupported file format: {file_format}") @@ -1019,7 +1041,11 @@ def _combine_positional_deletes(positional_deletes: List[pa.ChunkedArray], start def pyarrow_to_schema( schema: pa.Schema, name_mapping: Optional[NameMapping] = None, downcast_ns_timestamp_to_us: bool = False ) -> Schema: - has_ids = visit_pyarrow(schema, _HasIds()) + logger.warning(f"schema {schema}") + hids = _HasIds() + logger.warning("hasIds") + has_ids = visit_pyarrow(schema, hids) + logger.warning(f"has_ids is {has_ids}, name_mapping is {name_mapping}") if has_ids: return visit_pyarrow(schema, _ConvertToIceberg(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)) elif name_mapping is not None: @@ -1176,11 +1202,22 @@ def primitive(self, primitive: pa.DataType) -> T: def _get_field_id(field: pa.Field) -> Optional[int]: - return ( - int(field_id_str.decode()) - if (field.metadata and (field_id_str := field.metadata.get(PYARROW_PARQUET_FIELD_ID_KEY))) - else None - ) + """Return the Iceberg field ID from Parquet or ORC metadata if available.""" + if not field.metadata: + return None + + # Try Parquet field ID first + field_id_bytes = field.metadata.get(PYARROW_PARQUET_FIELD_ID_KEY) + if field_id_bytes: + return int(field_id_bytes.decode()) + + # Fallback: try ORC field ID + field_id_bytes = field.metadata.get(ORC_FIELD_ID_KEY) + if field_id_bytes: + return int(field_id_bytes.decode()) + + return None + class _HasIds(PyArrowSchemaVisitor[bool]): @@ -1431,10 +1468,17 @@ def _task_to_record_batches( name_mapping: Optional[NameMapping] = None, partition_spec: Optional[PartitionSpec] = None, ) -> Iterator[pa.RecordBatch]: - arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8)) + logger.warning(f"file format is {task.file.file_format}") + if task.file.file_format == FileFormat.PARQUET: + arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8)) + elif task.file.file_format == FileFormat.ORC: + arrow_format = ds.OrcFileFormat() # currently ORC doesn't support any fragment scan options + else: + raise ValueError("Unsupported file format") with io.new_input(task.file.file_path).open() as fin: fragment = arrow_format.make_fragment(fin) physical_schema = fragment.physical_schema + logger.warning(f"formats: filepath {task.file.file_path}, fragment {fragment}, physical_schema {physical_schema}") # In V1 and V2 table formats, we only support Timestamp 'us' in Iceberg Schema # Hence it is reasonable to always cast 'ns' timestamp to 'us' on read. # When V3 support is introduced, we will update `downcast_ns_timestamp_to_us` flag based on @@ -2498,9 +2542,69 @@ def write_parquet(task: WriteTask) -> DataFile: return data_file + def write_orc(task: WriteTask) -> DataFile: + table_schema = table_metadata.schema() + if (sanitized_schema := sanitize_column_names(table_schema)) != table_schema: + file_schema = sanitized_schema + else: + file_schema = table_schema + + downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False + batches = [ + _to_requested_schema( + requested_schema=file_schema, + file_schema=task.schema, + batch=batch, + downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, + include_field_ids=True, + ) + for batch in task.record_batches + ] + arrow_table = pa.Table.from_batches(batches) + file_path = location_provider.new_data_location( + data_file_name=task.generate_data_file_filename("orc"), + partition_key=task.partition_key, + ) + fo = io.new_output(file_path) + with fo.create(overwrite=True) as fos: + orc.write_table(arrow_table, fos) + + # Extract statistics from the written ORC file + orc_file = orc.ORCFile(fo.to_input_file().open()) + statistics = data_file_statistics_from_orc_metadata( + orc_metadata=orc_file, + stats_columns=compute_statistics_plan(file_schema, table_metadata.properties), + orc_column_mapping=orc_column_to_id_mapping(file_schema), + arrow_table=arrow_table, + ) + + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path=file_path, + file_format=FileFormat.ORC, + partition=task.partition_key.partition if task.partition_key else Record(), + file_size_in_bytes=len(fo), + sort_order_id=None, + spec_id=table_metadata.default_spec_id, + equality_ids=None, + key_metadata=None, + **statistics.to_serialized_dict(), + ) + return data_file + executor = ExecutorFactory.get_or_create() - data_files = executor.map(write_parquet, tasks) + def dispatch(task: WriteTask) -> DataFile: + file_format = FileFormat(table_metadata.properties.get( + TableProperties.WRITE_FILE_FORMAT, + TableProperties.WRITE_FILE_FORMAT_DEFAULT)) + if file_format == FileFormat.PARQUET: + return write_parquet(task) + elif file_format == FileFormat.ORC: + return write_orc(task) + else: + raise ValueError(f"Unsupported file format: {file_format}") + data_files = executor.map(dispatch, tasks) return iter(data_files) @@ -2782,3 +2886,180 @@ def _get_field_from_arrow_table(arrow_table: pa.Table, field_path: str) -> pa.Ar field_array = arrow_table[path_parts[0]] # Navigate into the struct using the remaining path parts return pc.struct_field(field_array, path_parts[1:]) + + +def data_file_statistics_from_orc_metadata( + orc_metadata: "orc.ORCFile", + stats_columns: Dict[int, StatisticsCollector], + orc_column_mapping: Dict[str, int], + arrow_table: Optional[pa.Table] = None, +) -> DataFileStatistics: + """ + Compute and return DataFileStatistics that includes the following. + + - record_count + - column_sizes + - value_counts + - null_value_counts + - nan_value_counts + - column_aggregates + - split_offsets + + Args: + orc_metadata (pyarrow.orc.ORCFile): A pyarrow ORC file object. + stats_columns (Dict[int, StatisticsCollector]): The statistics gathering plan. It is required to + set the mode for column metrics collection + orc_column_mapping (Dict[str, int]): The mapping of the ORC column name to the field ID + arrow_table (pa.Table, optional): The original arrow table that was written, used for row count + """ + column_sizes: Dict[int, int] = {} + value_counts: Dict[int, int] = {} + split_offsets: List[int] = [] + + null_value_counts: Dict[int, int] = {} + nan_value_counts: Dict[int, int] = {} + + col_aggs = {} + + invalidate_col: Set[int] = set() + + # Get row count from the arrow table if available, otherwise use a default + if arrow_table is not None: + record_count = arrow_table.num_rows + else: + # Fallback: ORC doesn't provide num_rows like Parquet, so we'll use a default + record_count = 0 + + # ORC files have a single stripe structure, unlike Parquet's row groups + # We'll process the file-level statistics + for col_name, field_id in orc_column_mapping.items(): + stats_col = stats_columns[field_id] + + # Initialize column sizes (ORC doesn't provide per-column size like Parquet) + column_sizes[field_id] = 0 # ORC doesn't provide detailed column size info + + if stats_col.mode == MetricsMode(MetricModeTypes.NONE): + continue + + # Get column statistics from ORC metadata + try: + # ORC provides file-level statistics + # Note: ORC statistics are more limited than Parquet + # We'll use the available statistics and set defaults for missing ones + + # For ORC, we'll use the total number of values as value count + # This is a simplification since ORC doesn't provide per-column value counts like Parquet + value_counts[field_id] = record_count + + # ORC doesn't provide null counts in the same way as Parquet + # We'll set this to 0 for now, as ORC doesn't expose null counts easily + null_value_counts[field_id] = 0 + + if stats_col.mode == MetricsMode(MetricModeTypes.COUNTS): + continue + + if field_id not in col_aggs: + col_aggs[field_id] = StatsAggregator( + stats_col.iceberg_type, _primitive_to_physical(stats_col.iceberg_type), stats_col.mode.length + ) + + # ORC doesn't provide min/max statistics in the same way as Parquet + # We'll skip the min/max aggregation for ORC files + # This is a limitation of ORC's metadata structure compared to Parquet + + except Exception as e: + invalidate_col.add(field_id) + logger.warning(f"Failed to extract ORC statistics for column {col_name}: {e}") + + # ORC doesn't have split offsets like Parquet + # We'll use an empty list or a single offset at 0 + split_offsets = [0] if record_count > 0 else [] + + # Clean up invalid columns + for field_id in invalidate_col: + col_aggs.pop(field_id, None) + null_value_counts.pop(field_id, None) + + return DataFileStatistics( + record_count=record_count, + column_sizes=column_sizes, + value_counts=value_counts, + null_value_counts=null_value_counts, + nan_value_counts=nan_value_counts, + column_aggregates=col_aggs, + split_offsets=split_offsets, + ) + + +class ID2OrcColumn: + field_id: int + orc_column: str + + def __init__(self, field_id: int, orc_column: str): + self.field_id = field_id + self.orc_column = orc_column + + +class ID2OrcColumnVisitor(PreOrderSchemaVisitor[List[ID2OrcColumn]]): + _field_id: int = 0 + _path: List[str] + + def __init__(self) -> None: + self._path = [] + + def schema(self, schema: Schema, struct_result: Callable[[], List[ID2OrcColumn]]) -> List[ID2OrcColumn]: + return struct_result() + + def struct(self, struct: StructType, field_results: List[Callable[[], List[ID2OrcColumn]]]) -> List[ID2OrcColumn]: + return list(itertools.chain(*[result() for result in field_results])) + + def field(self, field: NestedField, field_result: Callable[[], List[ID2OrcColumn]]) -> List[ID2OrcColumn]: + self._field_id = field.field_id + self._path.append(field.name) + result = field_result() + self._path.pop() + return result + + def list(self, list_type: ListType, element_result: Callable[[], List[ID2OrcColumn]]) -> List[ID2OrcColumn]: + self._field_id = list_type.element_id + self._path.append("list.element") + result = element_result() + self._path.pop() + return result + + def map( + self, + map_type: MapType, + key_result: Callable[[], List[ID2OrcColumn]], + value_result: Callable[[], List[ID2OrcColumn]], + ) -> List[ID2OrcColumn]: + self._field_id = map_type.key_id + self._path.append("key_value.key") + k = key_result() + self._path.pop() + self._field_id = map_type.value_id + self._path.append("key_value.value") + v = value_result() + self._path.pop() + return k + v + + def primitive(self, primitive: PrimitiveType) -> List[ID2OrcColumn]: + return [ID2OrcColumn(field_id=self._field_id, orc_column=".".join(self._path))] + + +def orc_column_to_id_mapping( + schema: Schema, +) -> Dict[str, int]: + """ + Create a mapping from ORC column names to Iceberg field IDs. + + Args: + schema: The Iceberg schema + + Returns: + A dictionary mapping ORC column names to field IDs + """ + result: Dict[str, int] = {} + for pair in pre_order_visit(schema, ID2OrcColumnVisitor()): + result[pair.orc_column] = pair.field_id + return result diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index a92d944811..83608d840f 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -1062,7 +1062,15 @@ def to_manifest_file(self) -> ManifestFile: def add_entry(self, entry: ManifestEntry) -> ManifestWriter: if self.closed: raise RuntimeError("Cannot add entry to closed manifest writer") - if entry.status == ManifestEntryStatus.ADDED: + # Ensure record_count is not None + if entry.data_file.record_count is None: + entry.data_file.record_count = 0 # or a real row count if available + if entry.data_file.file_format == FileFormat.ORC: + # ORC file stats not yet supported + self._added_files += 1 if entry.status == ManifestEntryStatus.ADDED else 0 + self._existing_files += 1 if entry.status == ManifestEntryStatus.EXISTING else 0 + self._deleted_files += 1 if entry.status == ManifestEntryStatus.DELETED else 0 + elif entry.status == ManifestEntryStatus.ADDED: self._added_files += 1 self._added_rows += entry.data_file.record_count elif entry.status == ManifestEntryStatus.EXISTING: diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 30b06fcb0b..4fd7f07f2a 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -161,6 +161,9 @@ class UpsertResult: class TableProperties: + WRITE_FILE_FORMAT = "write.format.default" + WRITE_FILE_FORMAT_DEFAULT = "parquet" + PARQUET_ROW_GROUP_SIZE_BYTES = "write.parquet.row-group-size-bytes" PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024 # 128 MB @@ -1050,6 +1053,7 @@ def __init__( catalog: Catalog, config: Dict[str, str] = EMPTY_DICT, ) -> None: + self._validate_table_metadata(metadata) self._identifier = identifier self.metadata = metadata self.metadata_location = metadata_location @@ -1057,6 +1061,14 @@ def __init__( self.catalog = catalog self.config = config + def _validate_table_metadata(self, table_metadata: TableMetadata) -> None: + from pyiceberg.manifest import FileFormat + file_format = FileFormat(table_metadata.properties.get( + TableProperties.WRITE_FILE_FORMAT, + TableProperties.WRITE_FILE_FORMAT_DEFAULT)) + if file_format not in (FileFormat.PARQUET, FileFormat.ORC): + raise ValueError(f"Unsupported file format: {file_format}") + def transaction(self) -> Transaction: """Create a new transaction object to first stage the changes, and then commit them to the catalog. diff --git a/tests/conftest.py b/tests/conftest.py index 16c9e06dac..cb8da7d500 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2382,6 +2382,30 @@ def example_task(data_file: str) -> FileScanTask: ) +@pytest.fixture +def data_file_orc(table_schema_simple: Schema, tmp_path: str) -> str: + import pyarrow as pa + import pyarrow.orc as orc + + from pyiceberg.io.pyarrow import schema_to_pyarrow + + table = pa.table( + {"foo": ["a", "b", "c"], "bar": [1, 2, 3], "baz": [True, False, None]}, + schema=schema_to_pyarrow(table_schema_simple), + ) + + file_path = f"{tmp_path}/0000-data.orc" + orc.write_table(table=table, where=file_path) + return file_path + + +@pytest.fixture +def example_task_orc(data_file_orc: str) -> FileScanTask: + return FileScanTask( + data_file=DataFile.from_args(file_path=data_file_orc, file_format=FileFormat.ORC, file_size_in_bytes=1925), + ) + + @pytest.fixture(scope="session") def warehouse(tmp_path_factory: pytest.TempPathFactory) -> Path: return tmp_path_factory.mktemp("test_sql") @@ -2410,6 +2434,20 @@ def table_v2(example_table_metadata_v2: Dict[str, Any]) -> Table: catalog=NoopCatalog("NoopCatalog"), ) +@pytest.fixture +def table_v2_orc(example_table_metadata_v2: Dict[str, Any]) -> Table: + if not example_table_metadata_v2["properties"]: + example_table_metadata_v2["properties"] = {} + example_table_metadata_v2["properties"]["write.format.default"] = "ORC" + table_metadata = TableMetadataV2(**example_table_metadata_v2) + return Table( + identifier=("database", "table_orc"), + metadata=table_metadata, + metadata_location=f"{table_metadata.location}/uuid.metadata.json", + io=load_file_io(), + catalog=NoopCatalog("NoopCatalog"), + ) + @pytest.fixture def table_v2_with_fixed_and_decimal_types( diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index a33b1a36bc..d08acf2066 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -1074,3 +1074,186 @@ def test_filter_after_arrow_scan(catalog: Catalog) -> None: scan = scan.filter("ts >= '2023-03-05T00:00:00+00:00'") assert len(scan.to_arrow()) > 0 + +import pytest +import pyarrow as pa +from pyarrow import orc +from pyiceberg.io.pyarrow import PyArrowFileIO +from pyiceberg.schema import Schema +from pyiceberg.types import ( + BooleanType, + IntegerType, + LongType, + FloatType, + DoubleType, + StringType, + TimestampType, + DateType, + BinaryType, + DecimalType, + NestedField, +) + + +# Basic ORC Read Tests + +@pytest.mark.skipif(not hasattr(pa, 'orc'), reason="ORC not available in PyArrow") +def test_orc_read_simple_table(tmp_path): + """Test basic ORC file reading with simple data types""" + file_io = PyArrowFileIO() + + # Create simple test data + data = pa.table([ + pa.array([1, 2, 3, 4, 5], type=pa.int64()), + pa.array(["Alice", "Bob", "Charlie", "David", "Eve"], type=pa.string()), + pa.array([1.1, 2.2, 3.3, 4.4, 5.5], type=pa.float64()), + ], names=["id", "name", "value"]) + + # Write ORC file + file_path = tmp_path / "test_simple.orc" + with file_io.new_output(str(file_path)) as output_file: + orc.write_table(data, output_file) + + # Read ORC file back + with file_io.new_input(str(file_path)) as input_file: + result = orc.read_table(input_file) + + assert result.num_rows == 5 + assert result.num_columns == 3 + assert result.column_names == ["id", "name", "value"] + assert result.equals(data) + + +@pytest.mark.skipif(not hasattr(pa, 'orc'), reason="ORC not available in PyArrow") +def test_orc_read_with_nulls(tmp_path): + """Test ORC file reading with null values""" + file_io = PyArrowFileIO() + + # Create data with nulls + data = pa.table([ + pa.array([1, None, 3, None, 5], type=pa.int64()), + pa.array([None, "hello", None, "world", None], type=pa.string()), + pa.array([1.1, 2.2, None, 4.4, None], type=pa.float64()), + ], names=["nullable_int", "nullable_string", "nullable_double"]) + + file_path = tmp_path / "test_nulls.orc" + with file_io.new_output(str(file_path)) as output_file: + orc.write_table(data, output_file) + + # Read back + with file_io.new_input(str(file_path)) as input_file: + result = orc.read_table(input_file) + + assert result.equals(data) + + # Verify null counts + assert result.column("nullable_int").null_count == 2 + assert result.column("nullable_string").null_count == 3 + assert result.column("nullable_double").null_count == 2 + + +@pytest.mark.skipif(not hasattr(pa, 'orc'), reason="ORC not available in PyArrow") +def test_orc_read_basic_types(tmp_path): + """Test ORC reading with various basic data types""" + file_io = PyArrowFileIO() + + # Create data with different basic types + data = pa.table([ + pa.array([True, False, True, False], type=pa.bool_()), + pa.array([10, 20, 30, 40], type=pa.int32()), + pa.array([100, 200, 300, 400], type=pa.int64()), + pa.array([1.5, 2.5, 3.5, 4.5], type=pa.float32()), + pa.array([10.15, 20.25, 30.35, 40.45], type=pa.float64()), + pa.array(["test1", "test2", "test3", "test4"], type=pa.string()), + pa.array([b"binary1", b"binary2", b"binary3", b"binary4"], type=pa.binary()), + ], names=["bool_col", "int32_col", "int64_col", "float32_col", "float64_col", "string_col", "binary_col"]) + + file_path = tmp_path / "test_types.orc" + with file_io.new_output(str(file_path)) as output_file: + orc.write_table(data, output_file) + + # Read back + with file_io.new_input(str(file_path)) as input_file: + result = orc.read_table(input_file) + + assert result.num_rows == 4 + assert result.num_columns == 7 + assert result.equals(data) + + +@pytest.mark.skipif(not hasattr(pa, 'orc'), reason="ORC not available in PyArrow") +def test_orc_read_empty_table(tmp_path): + """Test reading empty ORC file""" + file_io = PyArrowFileIO() + + # Create empty table with schema + schema = pa.schema([ + pa.field("id", pa.int64()), + pa.field("name", pa.string()), + ]) + data = pa.table([], schema=schema) + + file_path = tmp_path / "test_empty.orc" + with file_io.new_output(str(file_path)) as output_file: + orc.write_table(data, output_file) + + # Read back + with file_io.new_input(str(file_path)) as input_file: + result = orc.read_table(input_file) + + assert result.num_rows == 0 + assert result.num_columns == 2 + assert result.schema.equals(schema) + + +@pytest.mark.skipif(not hasattr(pa, 'orc'), reason="ORC not available in PyArrow") +def test_orc_read_single_row(tmp_path): + """Test reading ORC file with single row""" + file_io = PyArrowFileIO() + + data = pa.table([ + pa.array([42], type=pa.int64()), + pa.array(["single"], type=pa.string()), + pa.array([3.14], type=pa.float64()), + ], names=["id", "name", "value"]) + + file_path = tmp_path / "test_single.orc" + with file_io.new_output(str(file_path)) as output_file: + orc.write_table(data, output_file) + + # Read back + with file_io.new_input(str(file_path)) as input_file: + result = orc.read_table(input_file) + + assert result.num_rows == 1 + assert result.equals(data) + assert result["id"].to_pylist() == [42] + assert result["name"].to_pylist() == ["single"] + assert result["value"].to_pylist() == [3.14] + + +@pytest.mark.skipif(not hasattr(pa, 'orc'), reason="ORC not available in PyArrow") +def test_orc_read_column_subset(tmp_path): + """Test reading only specific columns from ORC file""" + file_io = PyArrowFileIO() + + # Create table with multiple columns + data = pa.table([ + pa.array([1, 2, 3], type=pa.int64()), + pa.array(["a", "b", "c"], type=pa.string()), + pa.array([1.0, 2.0, 3.0], type=pa.float64()), + pa.array([True, False, True], type=pa.bool_()), + ], names=["id", "name", "value", "flag"]) + + file_path = tmp_path / "test_columns.orc" + with file_io.new_output(str(file_path)) as output_file: + orc.write_table(data, output_file) + + # Read only specific columns + with file_io.new_input(str(file_path)) as input_file: + result = orc.read_table(input_file, columns=["id", "name"]) + + assert result.num_columns == 2 + assert result.column_names == ["id", "name"] + expected = data.select(["id", "name"]) + assert result.equals(expected) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index f5c3082edc..399b151363 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -28,6 +28,7 @@ import pyarrow import pyarrow as pa import pyarrow.parquet as pq +import pyarrow.orc as orc import pytest from packaging import version from pyarrow.fs import AwsDefaultS3RetryStrategy, FileType, LocalFileSystem, S3FileSystem @@ -1551,13 +1552,24 @@ def test_projection_filter_on_unknown_field(schema_int_str: Schema, file_int_str assert "Could not find field with name unknown_field, case_sensitive=True" in str(exc_info.value) -@pytest.fixture -def deletes_file(tmp_path: str, example_task: FileScanTask) -> str: +@pytest.fixture(params=["parquet", "orc"]) +def deletes_file(tmp_path: str, request: pytest.FixtureRequest) -> str: + if request.param == "parquet": + example_task = request.getfixturevalue("example_task") + import pyarrow.parquet as pq + write_func = pq.write_table + file_ext = "parquet" + else: # orc + example_task = request.getfixturevalue("example_task_orc") + import pyarrow.orc as orc + write_func = orc.write_table + file_ext = "orc" + path = example_task.file.file_path table = pa.table({"file_path": [path, path, path], "pos": [1, 3, 5]}) - deletes_file_path = f"{tmp_path}/deletes.parquet" - pq.write_table(table, deletes_file_path) + deletes_file_path = f"{tmp_path}/deletes.{file_ext}" + write_func(table, deletes_file_path) return deletes_file_path @@ -2654,3 +2666,102 @@ def test_retry_strategy_not_found() -> None: io = PyArrowFileIO(properties={S3_RETRY_STRATEGY_IMPL: "pyiceberg.DoesNotExist"}) with pytest.warns(UserWarning, match="Could not initialize S3 retry strategy: pyiceberg.DoesNotExist"): io.new_input("s3://bucket/path/to/file") + + +def test_write_and_read_orc(tmp_path): + # Create a simple Arrow table + data = pa.table({'a': [1, 2, 3], 'b': ['x', 'y', 'z']}) + orc_path = tmp_path / 'test.orc' + orc.write_table(data, str(orc_path)) + # Read it back + orc_file = orc.ORCFile(str(orc_path)) + table_read = orc_file.read() + assert table_read.equals(data) + + +def test_orc_file_format_integration(tmp_path): + # This test mimics a minimal integration with PyIceberg's FileFormat enum and pyarrow.orc + from pyiceberg.manifest import FileFormat + import pyarrow.dataset as ds + data = pa.table({'a': [10, 20], 'b': ['foo', 'bar']}) + orc_path = tmp_path / 'iceberg.orc' + orc.write_table(data, str(orc_path)) + # Use PyArrow dataset API to read as ORC + dataset = ds.dataset(str(orc_path), format=ds.OrcFileFormat()) + table_read = dataset.to_table() + assert table_read.equals(data) + + +def test_iceberg_write_and_read_orc(tmp_path): + """ + Integration test: Write and read ORC via Iceberg API. + To run just this test: + pytest tests/io/test_pyarrow.py -k test_iceberg_write_and_read_orc + """ + import pyarrow as pa + from pyiceberg.schema import Schema, NestedField + from pyiceberg.types import IntegerType, StringType + from pyiceberg.manifest import FileFormat, DataFileContent + from pyiceberg.table.metadata import TableMetadataV2 + from pyiceberg.partitioning import PartitionSpec + from pyiceberg.io.pyarrow import write_file, PyArrowFileIO, ArrowScan + from pyiceberg.table import WriteTask, FileScanTask + import uuid + + # Define schema and data + schema = Schema( + NestedField(1, "id", IntegerType(), required=True), + NestedField(2, "name", StringType(), required=False), + ) + data = pa.table({"id": pa.array([1, 2, 3], type=pa.int32()), "name": ["a", "b", "c"]}) + + # Create table metadata + table_metadata = TableMetadataV2( + location=str(tmp_path), + last_column_id=2, + format_version=2, + schemas=[schema], + partition_specs=[PartitionSpec()], + properties={ + "write.format.default": "orc", + } + ) + io = PyArrowFileIO() + + # Write ORC file using Iceberg API + write_uuid = uuid.uuid4() + tasks = [ + WriteTask( + write_uuid=write_uuid, + task_id=0, + record_batches=data.to_batches(), + schema=schema, + ) + ] + data_files = list(write_file(io, table_metadata, iter(tasks))) + assert len(data_files) == 1 + data_file = data_files[0] + assert data_file.file_format == FileFormat.ORC + assert data_file.content == DataFileContent.DATA + + # Read back using ArrowScan + scan = ArrowScan( + table_metadata=table_metadata, + io=io, + projected_schema=schema, + row_filter=AlwaysTrue(), + case_sensitive=True, + ) + scan_task = FileScanTask(data_file=data_file) + table_read = scan.to_table([scan_task]) + + # Compare data ignoring schema metadata (like not null constraints) + assert table_read.num_rows == data.num_rows + assert table_read.num_columns == data.num_columns + assert table_read.column_names == data.column_names + + # Compare actual column data values + for col_name in data.column_names: + original_values = data.column(col_name).to_pylist() + read_values = table_read.column(col_name).to_pylist() + assert original_values == read_values, f"Column {col_name} values don't match" \ No newline at end of file diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 748a77eee0..741897391d 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -265,8 +265,34 @@ def test_history(table_v2: Table) -> None: ] -def test_table_scan_select(table_v2: Table) -> None: - scan = table_v2.scan() +def test_table_bad_file_format() -> None: + from conftest import EXAMPLE_TABLE_METADATA_V2 + table_metadata = EXAMPLE_TABLE_METADATA_V2 + table_metadata["properties"] = {} + table_metadata["properties"]["write.format.default"] = "bad" + table_metadata = TableMetadataV2(**table_metadata) + with pytest.raises(ValueError): # replace with the actual exception + table = Table( + identifier=("database", "table_orc"), + metadata=table_metadata, + metadata_location=f"{table_metadata.location}/uuid.metadata.json", + io=load_file_io(), + catalog=NoopCatalog("NoopCatalog"), + ) + + +@pytest.mark.parametrize( + "table_fixture", + [ + pytest.param(pytest.lazy_fixture("table_v2"), id="parquet"), + pytest.param(pytest.lazy_fixture("table_v2_orc"), id="orc"), + ], +) +def test_table_scan_select(table_fixture: Table) -> None: + import logging + logger = logging.getLogger(__name__) + logger.debug(table_fixture.metadata) + scan = table_fixture.scan() assert scan.selected_fields == ("*",) assert scan.select("a", "b").selected_fields == ("a", "b") assert scan.select("a", "c").select("a").selected_fields == ("a",)