Skip to content

Commit 97b73b9

Browse files
author
Tom McCormick
committed
basic iceberg table write for orc
1 parent 1f47ce2 commit 97b73b9

File tree

2 files changed

+197
-3
lines changed

2 files changed

+197
-3
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 188 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2568,7 +2568,16 @@ def write_orc(task: WriteTask) -> DataFile:
25682568
fo = io.new_output(file_path)
25692569
with fo.create(overwrite=True) as fos:
25702570
orc.write_table(arrow_table, fos)
2571-
# You may want to add statistics extraction here if needed
2571+
2572+
# Extract statistics from the written ORC file
2573+
orc_file = orc.ORCFile(fo.to_input_file().open())
2574+
statistics = data_file_statistics_from_orc_metadata(
2575+
orc_metadata=orc_file,
2576+
stats_columns=compute_statistics_plan(file_schema, table_metadata.properties),
2577+
orc_column_mapping=orc_column_to_id_mapping(file_schema),
2578+
arrow_table=arrow_table,
2579+
)
2580+
25722581
data_file = DataFile.from_args(
25732582
content=DataFileContent.DATA,
25742583
file_path=file_path,
@@ -2579,7 +2588,7 @@ def write_orc(task: WriteTask) -> DataFile:
25792588
spec_id=table_metadata.default_spec_id,
25802589
equality_ids=None,
25812590
key_metadata=None,
2582-
# statistics=... (if you implement ORC stats)
2591+
**statistics.to_serialized_dict(),
25832592
)
25842593
return data_file
25852594

@@ -2877,3 +2886,180 @@ def _get_field_from_arrow_table(arrow_table: pa.Table, field_path: str) -> pa.Ar
28772886
field_array = arrow_table[path_parts[0]]
28782887
# Navigate into the struct using the remaining path parts
28792888
return pc.struct_field(field_array, path_parts[1:])
2889+
2890+
2891+
def data_file_statistics_from_orc_metadata(
2892+
orc_metadata: "orc.ORCFile",
2893+
stats_columns: Dict[int, StatisticsCollector],
2894+
orc_column_mapping: Dict[str, int],
2895+
arrow_table: Optional[pa.Table] = None,
2896+
) -> DataFileStatistics:
2897+
"""
2898+
Compute and return DataFileStatistics that includes the following.
2899+
2900+
- record_count
2901+
- column_sizes
2902+
- value_counts
2903+
- null_value_counts
2904+
- nan_value_counts
2905+
- column_aggregates
2906+
- split_offsets
2907+
2908+
Args:
2909+
orc_metadata (pyarrow.orc.ORCFile): A pyarrow ORC file object.
2910+
stats_columns (Dict[int, StatisticsCollector]): The statistics gathering plan. It is required to
2911+
set the mode for column metrics collection
2912+
orc_column_mapping (Dict[str, int]): The mapping of the ORC column name to the field ID
2913+
arrow_table (pa.Table, optional): The original arrow table that was written, used for row count
2914+
"""
2915+
column_sizes: Dict[int, int] = {}
2916+
value_counts: Dict[int, int] = {}
2917+
split_offsets: List[int] = []
2918+
2919+
null_value_counts: Dict[int, int] = {}
2920+
nan_value_counts: Dict[int, int] = {}
2921+
2922+
col_aggs = {}
2923+
2924+
invalidate_col: Set[int] = set()
2925+
2926+
# Get row count from the arrow table if available, otherwise use a default
2927+
if arrow_table is not None:
2928+
record_count = arrow_table.num_rows
2929+
else:
2930+
# Fallback: ORC doesn't provide num_rows like Parquet, so we'll use a default
2931+
record_count = 0
2932+
2933+
# ORC files have a single stripe structure, unlike Parquet's row groups
2934+
# We'll process the file-level statistics
2935+
for col_name, field_id in orc_column_mapping.items():
2936+
stats_col = stats_columns[field_id]
2937+
2938+
# Initialize column sizes (ORC doesn't provide per-column size like Parquet)
2939+
column_sizes[field_id] = 0 # ORC doesn't provide detailed column size info
2940+
2941+
if stats_col.mode == MetricsMode(MetricModeTypes.NONE):
2942+
continue
2943+
2944+
# Get column statistics from ORC metadata
2945+
try:
2946+
# ORC provides file-level statistics
2947+
# Note: ORC statistics are more limited than Parquet
2948+
# We'll use the available statistics and set defaults for missing ones
2949+
2950+
# For ORC, we'll use the total number of values as value count
2951+
# This is a simplification since ORC doesn't provide per-column value counts like Parquet
2952+
value_counts[field_id] = record_count
2953+
2954+
# ORC doesn't provide null counts in the same way as Parquet
2955+
# We'll set this to 0 for now, as ORC doesn't expose null counts easily
2956+
null_value_counts[field_id] = 0
2957+
2958+
if stats_col.mode == MetricsMode(MetricModeTypes.COUNTS):
2959+
continue
2960+
2961+
if field_id not in col_aggs:
2962+
col_aggs[field_id] = StatsAggregator(
2963+
stats_col.iceberg_type, _primitive_to_physical(stats_col.iceberg_type), stats_col.mode.length
2964+
)
2965+
2966+
# ORC doesn't provide min/max statistics in the same way as Parquet
2967+
# We'll skip the min/max aggregation for ORC files
2968+
# This is a limitation of ORC's metadata structure compared to Parquet
2969+
2970+
except Exception as e:
2971+
invalidate_col.add(field_id)
2972+
logger.warning(f"Failed to extract ORC statistics for column {col_name}: {e}")
2973+
2974+
# ORC doesn't have split offsets like Parquet
2975+
# We'll use an empty list or a single offset at 0
2976+
split_offsets = [0] if record_count > 0 else []
2977+
2978+
# Clean up invalid columns
2979+
for field_id in invalidate_col:
2980+
col_aggs.pop(field_id, None)
2981+
null_value_counts.pop(field_id, None)
2982+
2983+
return DataFileStatistics(
2984+
record_count=record_count,
2985+
column_sizes=column_sizes,
2986+
value_counts=value_counts,
2987+
null_value_counts=null_value_counts,
2988+
nan_value_counts=nan_value_counts,
2989+
column_aggregates=col_aggs,
2990+
split_offsets=split_offsets,
2991+
)
2992+
2993+
2994+
class ID2OrcColumn:
2995+
field_id: int
2996+
orc_column: str
2997+
2998+
def __init__(self, field_id: int, orc_column: str):
2999+
self.field_id = field_id
3000+
self.orc_column = orc_column
3001+
3002+
3003+
class ID2OrcColumnVisitor(PreOrderSchemaVisitor[List[ID2OrcColumn]]):
3004+
_field_id: int = 0
3005+
_path: List[str]
3006+
3007+
def __init__(self) -> None:
3008+
self._path = []
3009+
3010+
def schema(self, schema: Schema, struct_result: Callable[[], List[ID2OrcColumn]]) -> List[ID2OrcColumn]:
3011+
return struct_result()
3012+
3013+
def struct(self, struct: StructType, field_results: List[Callable[[], List[ID2OrcColumn]]]) -> List[ID2OrcColumn]:
3014+
return list(itertools.chain(*[result() for result in field_results]))
3015+
3016+
def field(self, field: NestedField, field_result: Callable[[], List[ID2OrcColumn]]) -> List[ID2OrcColumn]:
3017+
self._field_id = field.field_id
3018+
self._path.append(field.name)
3019+
result = field_result()
3020+
self._path.pop()
3021+
return result
3022+
3023+
def list(self, list_type: ListType, element_result: Callable[[], List[ID2OrcColumn]]) -> List[ID2OrcColumn]:
3024+
self._field_id = list_type.element_id
3025+
self._path.append("list.element")
3026+
result = element_result()
3027+
self._path.pop()
3028+
return result
3029+
3030+
def map(
3031+
self,
3032+
map_type: MapType,
3033+
key_result: Callable[[], List[ID2OrcColumn]],
3034+
value_result: Callable[[], List[ID2OrcColumn]],
3035+
) -> List[ID2OrcColumn]:
3036+
self._field_id = map_type.key_id
3037+
self._path.append("key_value.key")
3038+
k = key_result()
3039+
self._path.pop()
3040+
self._field_id = map_type.value_id
3041+
self._path.append("key_value.value")
3042+
v = value_result()
3043+
self._path.pop()
3044+
return k + v
3045+
3046+
def primitive(self, primitive: PrimitiveType) -> List[ID2OrcColumn]:
3047+
return [ID2OrcColumn(field_id=self._field_id, orc_column=".".join(self._path))]
3048+
3049+
3050+
def orc_column_to_id_mapping(
3051+
schema: Schema,
3052+
) -> Dict[str, int]:
3053+
"""
3054+
Create a mapping from ORC column names to Iceberg field IDs.
3055+
3056+
Args:
3057+
schema: The Iceberg schema
3058+
3059+
Returns:
3060+
A dictionary mapping ORC column names to field IDs
3061+
"""
3062+
result: Dict[str, int] = {}
3063+
for pair in pre_order_visit(schema, ID2OrcColumnVisitor()):
3064+
result[pair.orc_column] = pair.field_id
3065+
return result

pyiceberg/manifest.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1062,7 +1062,15 @@ def to_manifest_file(self) -> ManifestFile:
10621062
def add_entry(self, entry: ManifestEntry) -> ManifestWriter:
10631063
if self.closed:
10641064
raise RuntimeError("Cannot add entry to closed manifest writer")
1065-
if entry.status == ManifestEntryStatus.ADDED:
1065+
# Ensure record_count is not None
1066+
if entry.data_file.record_count is None:
1067+
entry.data_file.record_count = 0 # or a real row count if available
1068+
if entry.data_file.file_format == FileFormat.ORC:
1069+
# ORC file stats not yet supported
1070+
self._added_files += 1 if entry.status == ManifestEntryStatus.ADDED else 0
1071+
self._existing_files += 1 if entry.status == ManifestEntryStatus.EXISTING else 0
1072+
self._deleted_files += 1 if entry.status == ManifestEntryStatus.DELETED else 0
1073+
elif entry.status == ManifestEntryStatus.ADDED:
10661074
self._added_files += 1
10671075
self._added_rows += entry.data_file.record_count
10681076
elif entry.status == ManifestEntryStatus.EXISTING:

0 commit comments

Comments
 (0)