From 5a3255dc314bd12d10e19b77bb1b69a00c75ea1a Mon Sep 17 00:00:00 2001 From: Matthew Kim <38759997+friendlymatthew@users.noreply.github.com> Date: Sat, 12 Jul 2025 21:47:41 -0400 Subject: [PATCH 1/4] Validate schema --- parquet-variant-compute/src/from_json.rs | 12 +- parquet-variant-compute/src/lib.rs | 1 + parquet-variant-compute/src/shredding.rs | 350 ++++++++++++++++++ parquet-variant-compute/src/variant_array.rs | 25 +- .../src/variant_array_builder.rs | 31 +- parquet-variant-compute/src/variant_get.rs | 26 +- 6 files changed, 411 insertions(+), 34 deletions(-) create mode 100644 parquet-variant-compute/src/shredding.rs diff --git a/parquet-variant-compute/src/from_json.rs b/parquet-variant-compute/src/from_json.rs index 05207d094a25..efca4f0c9b3c 100644 --- a/parquet-variant-compute/src/from_json.rs +++ b/parquet-variant-compute/src/from_json.rs @@ -20,7 +20,8 @@ use crate::{VariantArray, VariantArrayBuilder}; use arrow::array::{Array, ArrayRef, StringArray}; -use arrow_schema::ArrowError; +use arrow_schema::{ArrowError, DataType, Field, Fields}; +use parquet_variant::VariantBuilder; use parquet_variant_json::json_to_variant; /// Parse a batch of JSON strings into a batch of Variants represented as @@ -34,7 +35,14 @@ pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result, + value_ref: Option, +} + +impl VariantSchema { + pub fn try_new(fields: Fields) -> Result { + todo!() + } +} + +pub fn validate_value_and_typed_value( + fields: &Fields, + allow_both_null: bool, +) -> Result<(), ArrowError> { + let value_field_res = fields.iter().find(|f| f.name() == VALUE); + let typed_value_field_res = fields.iter().find(|f| f.name() == TYPED_VALUE); + + if !allow_both_null { + if let (None, None) = (value_field_res, typed_value_field_res) { + return Err(ArrowError::InvalidArgumentError( + "Invalid VariantArray: StructArray must contain either `value` or `typed_value` fields or both.".to_string() + )); + } + } + + if let Some(value_field) = value_field_res { + if value_field.data_type() != &DataType::BinaryView { + return Err(ArrowError::NotYetImplemented(format!( + "VariantArray 'value' field must be BinaryView, got {}", + value_field.data_type() + ))); + } + } + + if let Some(typed_value_field) = fields.iter().find(|f| f.name() == TYPED_VALUE) { + // this is directly mapped from the spec's parquet physical types + // note, there are more data types we can support + // but for the sake of simplicity, I chose the smallest subset + match typed_value_field.data_type() { + DataType::Boolean + | DataType::Int32 + | DataType::Int64 + | DataType::Float32 + | DataType::Float64 + | DataType::BinaryView => {} + DataType::Union(union_fields, _) => { + union_fields + .iter() + .map(|(_, f)| f.clone()) + .try_for_each(|f| { + let DataType::Struct(fields) = f.data_type().clone() else { + return Err(ArrowError::InvalidArgumentError( + "Expected struct".to_string(), + )); + }; + + validate_value_and_typed_value(&fields, false) + })?; + } + + foreign => { + return Err(ArrowError::NotYetImplemented(format!( + "Unsupported VariantArray 'typed_value' field, got {foreign}" + ))) + } + } + } + + Ok(()) +} + +/// Validates that the provided [`Fields`] conform to the Variant shredding specification. +/// +/// # Requirements +/// - Must contain a "metadata" field of type BinaryView +/// - Must contain at least one of "value" (optional BinaryView) or "typed_value" (optional with valid Parquet type) +/// - Both "value" and "typed_value" can only be null simultaneously for shredded object fields +pub fn validate_shredded_schema(fields: &Fields) -> Result<(), ArrowError> { + let metadata_field = fields + .iter() + .find(|f| f.name() == METADATA) + .ok_or_else(|| { + ArrowError::InvalidArgumentError( + "Invalid VariantArray: StructArray must contain a 'metadata' field".to_string(), + ) + })?; + + if metadata_field.is_nullable() { + return Err(ArrowError::InvalidArgumentError( + "Invalid VariantArray: metadata field can not be nullable".to_string(), + )); + } + + if metadata_field.data_type() != &DataType::BinaryView { + return Err(ArrowError::NotYetImplemented(format!( + "VariantArray 'metadata' field must be BinaryView, got {}", + metadata_field.data_type() + ))); + } + + validate_value_and_typed_value(fields, false)?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + use arrow_schema::{Field, UnionFields, UnionMode}; + + #[test] + fn test_regular_variant_schema() { + // a regular variant schema + let metadata_field = Field::new("metadata", DataType::BinaryView, false); + let value_field = Field::new("value", DataType::BinaryView, false); + + let schema = Fields::from(vec![metadata_field, value_field]); + + validate_shredded_schema(&schema).unwrap(); + } + + #[test] + fn test_regular_variant_schema_order_agnostic() { + // a regular variant schema + let metadata_field = Field::new("metadata", DataType::BinaryView, false); + let value_field = Field::new("value", DataType::BinaryView, false); + + let schema = Fields::from(vec![value_field, metadata_field]); // note the order switch + + validate_shredded_schema(&schema).unwrap(); + } + + #[test] + fn test_regular_variant_schema_allow_other_columns() { + // a regular variant schema + let metadata_field = Field::new("metadata", DataType::BinaryView, false); + let value_field = Field::new("value", DataType::BinaryView, false); + + let trace_field = Field::new("trace_id", DataType::Utf8View, false); + let created_at_field = Field::new("created_at", DataType::Date64, false); + + let schema = Fields::from(vec![ + metadata_field, + trace_field, + created_at_field, + value_field, + ]); + + validate_shredded_schema(&schema).unwrap(); + } + + #[test] + fn test_regular_variant_schema_missing_metadata() { + let value_field = Field::new("value", DataType::BinaryView, false); + let schema = Fields::from(vec![value_field]); + + let err = validate_shredded_schema(&schema).unwrap_err(); + + assert_eq!( + err.to_string(), + "Invalid argument error: Invalid VariantArray: StructArray must contain a 'metadata' field" + ); + } + + #[test] + fn test_regular_variant_schema_nullable_metadata() { + let metadata_field = Field::new("metadata", DataType::BinaryView, true); + let value_field = Field::new("value", DataType::BinaryView, false); + + let schema = Fields::from(vec![metadata_field, value_field]); + + let err = validate_shredded_schema(&schema).unwrap_err(); + + assert_eq!( + err.to_string(), + "Invalid argument error: Invalid VariantArray: metadata field can not be nullable" + ); + } + + #[test] + fn test_regular_variant_schema_allow_nullable_value() { + // a regular variant schema + let metadata_field = Field::new("metadata", DataType::BinaryView, false); + let value_field = Field::new("value", DataType::BinaryView, true); + + let schema = Fields::from(vec![metadata_field, value_field]); + + validate_shredded_schema(&schema).unwrap(); + } + + #[test] + fn test_shredded_variant_schema() { + let metadata_field = Field::new("metadata", DataType::BinaryView, false); + let typed_value_field = Field::new("typed_value", DataType::Int64, false); + let schema = Fields::from(vec![metadata_field, typed_value_field]); + + validate_shredded_schema(&schema).unwrap(); + } + + #[test] + fn test_partially_shredded_variant_schema() { + let metadata_field = Field::new("metadata", DataType::BinaryView, false); + let value_field = Field::new("value", DataType::BinaryView, false); + let typed_value_field = Field::new("typed_value", DataType::Int64, false); + let schema = Fields::from(vec![metadata_field, value_field, typed_value_field]); + + validate_shredded_schema(&schema).unwrap(); + } + + #[test] + fn test_partially_shredded_variant_list_schema() { + /* + optional group tags (VARIANT) { + required binary metadata; + optional binary value; + optional group typed_value (LIST) { # must be optional to allow a null list + repeated group list { + required group element { # shredded element + optional binary value; + optional binary typed_value (STRING); + } + required group element { # shredded element + optional binary value; + optional int64 typed_value ; + } + } + } + } + */ + + let metadata_field = Field::new("metadata", DataType::BinaryView, false); + let value_field = Field::new("value", DataType::BinaryView, false); + + // Define union fields for different element types + let string_element = { + let value_field = Field::new("value", DataType::BinaryView, true); + let typed_value = Field::new("typed_value", DataType::BinaryView, true); + DataType::Struct(Fields::from(vec![value_field, typed_value])) + }; + + let int_element = { + let value_field = Field::new("value", DataType::BinaryView, true); + let typed_value = Field::new("typed_value", DataType::Int64, true); + DataType::Struct(Fields::from(vec![value_field, typed_value])) + }; + + // Create union of different element types + let union_fields = UnionFields::new( + vec![0, 1], + vec![ + Field::new("string_element", string_element, true), + Field::new("int_element", int_element, true), + ], + ); + + let typed_value_field = Field::new( + "typed_value", + DataType::Union(union_fields, UnionMode::Sparse), + false, + ); + let schema = Fields::from(vec![metadata_field, value_field, typed_value_field]); + + validate_shredded_schema(&schema).unwrap(); + } + + #[test] + fn test_partially_shredded_variant_object_schema() { + /* + optional group event (VARIANT) { + required binary metadata; + optional binary value; # a variant, expected to be an object + optional group typed_value { # shredded fields for the variant object + required group event_type { # shredded field for event_type + optional binary value; + optional binary typed_value (STRING); + } + required group event_ts { # shredded field for event_ts + optional binary value; + optional int64 typed_value (TIMESTAMP(true, MICROS)); + } + } + } + */ + + let metadata_field = Field::new("metadata", DataType::BinaryView, false); + let value_field = Field::new("value", DataType::BinaryView, false); + + // event_type + let element_group_1 = { + let value_field = Field::new("value", DataType::BinaryView, false); + let typed_value = Field::new("typed_value", DataType::BinaryView, false); // this is the string case + + Fields::from(vec![value_field, typed_value]) + }; + + // event_ts + let element_group_2 = { + let value_field = Field::new("value", DataType::BinaryView, false); + let typed_value = Field::new("typed_value", DataType::Int64, false); + + Fields::from(vec![value_field, typed_value]) + }; + + let typed_value_field = Field::new( + "typed_value", + DataType::Union( + UnionFields::new( + vec![0, 1], + vec![ + Field::new("event_type", DataType::Struct(element_group_1), true), + Field::new("event_ts", DataType::Struct(element_group_2), true), + ], + ), + UnionMode::Sparse, + ), + false, + ); + let schema = Fields::from(vec![metadata_field, value_field, typed_value_field]); + + validate_shredded_schema(&schema).unwrap(); + } +} diff --git a/parquet-variant-compute/src/variant_array.rs b/parquet-variant-compute/src/variant_array.rs index 843352d1ff01..4f21d52a9e4c 100644 --- a/parquet-variant-compute/src/variant_array.rs +++ b/parquet-variant-compute/src/variant_array.rs @@ -24,6 +24,8 @@ use parquet_variant::Variant; use std::any::Any; use std::sync::Arc; +use crate::shredding::validate_shredded_schema; + /// An array of Parquet [`Variant`] values /// /// A [`VariantArray`] wraps an Arrow [`StructArray`] that stores the underlying @@ -93,30 +95,21 @@ impl VariantArray { "Invalid VariantArray: requires StructArray as input".to_string(), )); }; - // Ensure the StructArray has a metadata field of BinaryView + validate_shredded_schema(inner.fields())?; + + // todo, remove this since we already do it in validate_shredded_schema let Some(metadata_field) = VariantArray::find_metadata_field(inner) else { return Err(ArrowError::InvalidArgumentError( "Invalid VariantArray: StructArray must contain a 'metadata' field".to_string(), )); }; - if metadata_field.data_type() != &DataType::BinaryView { - return Err(ArrowError::NotYetImplemented(format!( - "VariantArray 'metadata' field must be BinaryView, got {}", - metadata_field.data_type() - ))); - } + let Some(value_field) = VariantArray::find_value_field(inner) else { return Err(ArrowError::InvalidArgumentError( "Invalid VariantArray: StructArray must contain a 'value' field".to_string(), )); }; - if value_field.data_type() != &DataType::BinaryView { - return Err(ArrowError::NotYetImplemented(format!( - "VariantArray 'value' field must be BinaryView, got {}", - value_field.data_type() - ))); - } Ok(Self { inner: inner.clone(), @@ -258,14 +251,14 @@ mod test { let err = VariantArray::try_new(Arc::new(array)); assert_eq!( err.unwrap_err().to_string(), - "Invalid argument error: Invalid VariantArray: StructArray must contain a 'value' field" + "Invalid argument error: Invalid VariantArray: StructArray must contain either `value` or `typed_value` fields or both." ); } #[test] fn invalid_metadata_field_type() { let fields = Fields::from(vec![ - Field::new("metadata", DataType::Binary, true), // Not yet supported + Field::new("metadata", DataType::Binary, false), // Not yet supported Field::new("value", DataType::BinaryView, true), ]); let array = StructArray::new( @@ -283,7 +276,7 @@ mod test { #[test] fn invalid_value_field_type() { let fields = Fields::from(vec![ - Field::new("metadata", DataType::BinaryView, true), + Field::new("metadata", DataType::BinaryView, false), Field::new("value", DataType::Binary, true), // Not yet supported ]); let array = StructArray::new( diff --git a/parquet-variant-compute/src/variant_array_builder.rs b/parquet-variant-compute/src/variant_array_builder.rs index 6a8dba06f15d..2e4f3130c56b 100644 --- a/parquet-variant-compute/src/variant_array_builder.rs +++ b/parquet-variant-compute/src/variant_array_builder.rs @@ -17,9 +17,9 @@ //! [`VariantArrayBuilder`] implementation -use crate::VariantArray; +use crate::{shredding::validate_shredded_schema, VariantArray}; use arrow::array::{ArrayRef, BinaryViewArray, BinaryViewBuilder, NullBufferBuilder, StructArray}; -use arrow_schema::{DataType, Field, Fields}; +use arrow_schema::{ArrowError, DataType, Field, Fields}; use parquet_variant::{ListBuilder, ObjectBuilder, Variant, VariantBuilder, VariantBuilderExt}; use std::sync::Arc; @@ -39,8 +39,14 @@ use std::sync::Arc; /// # use arrow::array::Array; /// # use parquet_variant::{Variant, VariantBuilder, VariantBuilderExt}; /// # use parquet_variant_compute::VariantArrayBuilder; +/// # use arrow_schema::{DataType, Field, Fields}; /// // Create a new VariantArrayBuilder with a capacity of 100 rows -/// let mut builder = VariantArrayBuilder::new(100); +/// +/// let metadata_field = Field::new("metadata", DataType::BinaryView, false); +/// let value_field = Field::new("value", DataType::BinaryView, false); +/// let schema = Fields::from(vec![metadata_field, value_field]); +/// +/// let mut builder = VariantArrayBuilder::try_new(100, schema).unwrap(); /// // append variant values /// builder.append_variant(Variant::from(42)); /// // append a null row (note not a Variant::Null) @@ -88,19 +94,17 @@ pub struct VariantArrayBuilder { } impl VariantArrayBuilder { - pub fn new(row_capacity: usize) -> Self { - // The subfields are expected to be non-nullable according to the parquet variant spec. - let metadata_field = Field::new("metadata", DataType::BinaryView, false); - let value_field = Field::new("value", DataType::BinaryView, false); + pub fn try_new(row_capacity: usize, schema: Fields) -> Result { + validate_shredded_schema(&schema)?; - Self { + Ok(Self { nulls: NullBufferBuilder::new(row_capacity), metadata_buffer: Vec::new(), // todo allocation capacity metadata_locations: Vec::with_capacity(row_capacity), value_buffer: Vec::new(), value_locations: Vec::with_capacity(row_capacity), - fields: Fields::from(vec![metadata_field, value_field]), - } + fields: schema, + }) } /// Build the final builder @@ -363,7 +367,12 @@ mod test { /// Test that both the metadata and value buffers are non nullable #[test] fn test_variant_array_builder_non_nullable() { - let mut builder = VariantArrayBuilder::new(10); + let metadata_field = Field::new("metadata", DataType::BinaryView, false); + let value_field = Field::new("value", DataType::BinaryView, false); + + let schema = Fields::from(vec![metadata_field, value_field]); + + let mut builder = VariantArrayBuilder::try_new(10, schema).unwrap(); builder.append_null(); // should not panic builder.append_variant(Variant::from(42i32)); let variant_array = builder.build(); diff --git a/parquet-variant-compute/src/variant_get.rs b/parquet-variant-compute/src/variant_get.rs index b3a3d9e41f13..38f29d563936 100644 --- a/parquet-variant-compute/src/variant_get.rs +++ b/parquet-variant-compute/src/variant_get.rs @@ -21,7 +21,7 @@ use arrow::{ compute::CastOptions, error::Result, }; -use arrow_schema::{ArrowError, Field}; +use arrow_schema::{ArrowError, Field, Fields}; use parquet_variant::VariantPath; use crate::{VariantArray, VariantArrayBuilder}; @@ -32,7 +32,11 @@ use crate::{VariantArray, VariantArrayBuilder}; /// 1. `as_type: None`: a VariantArray is returned. The values in this new VariantArray will point /// to the specified path. /// 2. `as_type: Some()`: an array of the specified type is returned. -pub fn variant_get(input: &ArrayRef, options: GetOptions) -> Result { +pub fn variant_get( + input: &ArrayRef, + options: GetOptions, + shredded_schema: Fields, +) -> Result { let variant_array: &VariantArray = input.as_any().downcast_ref().ok_or_else(|| { ArrowError::InvalidArgumentError( "expected a VariantArray as the input for variant_get".to_owned(), @@ -45,7 +49,7 @@ pub fn variant_get(input: &ArrayRef, options: GetOptions) -> Result { ))); } - let mut builder = VariantArrayBuilder::new(variant_array.len()); + let mut builder = VariantArrayBuilder::try_new(variant_array.len(), shredded_schema).unwrap(); for i in 0..variant_array.len() { let new_variant = variant_array.value(i); // TODO: perf? @@ -90,6 +94,9 @@ mod test { use std::sync::Arc; use arrow::array::{Array, ArrayRef, StringArray}; + use arrow_schema::DataType; + use arrow_schema::Field; + use arrow_schema::Fields; use parquet_variant::VariantPath; use crate::batch_json_string_to_variant; @@ -103,8 +110,17 @@ mod test { let input_variant_array_ref: ArrayRef = Arc::new(batch_json_string_to_variant(&input_array_ref).unwrap()); - let result = - variant_get(&input_variant_array_ref, GetOptions::new_with_path(path)).unwrap(); + let metadata_field = Field::new("metadata", DataType::BinaryView, false); + let value_field = Field::new("value", DataType::BinaryView, false); + + let schema = Fields::from(vec![metadata_field, value_field]); + + let result = variant_get( + &input_variant_array_ref, + GetOptions::new_with_path(path), + schema, + ) + .unwrap(); // Create expected array from JSON string let expected_array_ref: ArrayRef = Arc::new(StringArray::from(vec![Some(expected_json)])); From 8bd26fb0afc62978e971bca90701e7ef2b28d061 Mon Sep 17 00:00:00 2001 From: Matthew Kim <38759997+friendlymatthew@users.noreply.github.com> Date: Sun, 20 Jul 2025 13:48:31 +0200 Subject: [PATCH 2/4] Introduce new type VariantSchema --- parquet-variant-compute/src/lib.rs | 2 +- parquet-variant-compute/src/shredding.rs | 409 +++++++----------- parquet-variant-compute/src/variant_array.rs | 4 - .../src/variant_array_builder.rs | 15 +- 4 files changed, 169 insertions(+), 261 deletions(-) diff --git a/parquet-variant-compute/src/lib.rs b/parquet-variant-compute/src/lib.rs index 1c3d918ceb9e..ce0b6d5bea38 100644 --- a/parquet-variant-compute/src/lib.rs +++ b/parquet-variant-compute/src/lib.rs @@ -16,7 +16,7 @@ // under the License. mod from_json; -mod shredding; +pub mod shredding; mod to_json; mod variant_array; mod variant_array_builder; diff --git a/parquet-variant-compute/src/shredding.rs b/parquet-variant-compute/src/shredding.rs index 29462d09c7fc..68a1a53867cf 100644 --- a/parquet-variant-compute/src/shredding.rs +++ b/parquet-variant-compute/src/shredding.rs @@ -22,159 +22,219 @@ pub const METADATA: &str = "metadata"; pub const VALUE: &str = "value"; pub const TYPED_VALUE: &str = "typed_value"; +#[derive(Debug, PartialEq)] +pub enum ValueSchema { + MissingValue, + Value(usize), + ShreddedValue(usize), + PartiallyShredded { + value_idx: usize, + shredded_value_idx: usize, + }, +} + #[derive(Debug)] pub struct VariantSchema { - metadata_ref: FieldRef, - typed_value_ref: Option, - value_ref: Option, -} + inner: Fields, -impl VariantSchema { - pub fn try_new(fields: Fields) -> Result { - todo!() - } -} + metadata_idx: usize, -pub fn validate_value_and_typed_value( - fields: &Fields, - allow_both_null: bool, -) -> Result<(), ArrowError> { - let value_field_res = fields.iter().find(|f| f.name() == VALUE); - let typed_value_field_res = fields.iter().find(|f| f.name() == TYPED_VALUE); + // these indicies are for the top-level most `value` and `typed_value` columns + value_schema: ValueSchema, +} - if !allow_both_null { - if let (None, None) = (value_field_res, typed_value_field_res) { +impl VariantSchema { + /// find column metadata and ensure + /// returns the column index + /// # Requirements + /// - Must contain a "metadata" field of type BinaryView + /// - Must contain at least one of "value" (optional BinaryView) or "typed_value" (optional with valid Parquet type) + /// - Both "value" and "typed_value" can only be null simultaneously for shredded object fields + fn validate_metadata(fields: &Fields) -> Result { + let (metadata_idx, metadata_field) = fields + .iter() + .enumerate() + .find(|(i, f)| f.name() == METADATA) + .ok_or_else(|| { + ArrowError::InvalidArgumentError( + "Invalid VariantArray: StructArray must contain a 'metadata' field".to_string(), + ) + })?; + + if metadata_field.is_nullable() { return Err(ArrowError::InvalidArgumentError( - "Invalid VariantArray: StructArray must contain either `value` or `typed_value` fields or both.".to_string() + "Invalid VariantArray: metadata field can not be nullable".to_string(), )); } - } - if let Some(value_field) = value_field_res { - if value_field.data_type() != &DataType::BinaryView { + if metadata_field.data_type() != &DataType::BinaryView { return Err(ArrowError::NotYetImplemented(format!( - "VariantArray 'value' field must be BinaryView, got {}", - value_field.data_type() + "VariantArray 'metadata' field must be BinaryView, got {}", + metadata_field.data_type() ))); } + + Ok(metadata_idx) } - if let Some(typed_value_field) = fields.iter().find(|f| f.name() == TYPED_VALUE) { - // this is directly mapped from the spec's parquet physical types - // note, there are more data types we can support - // but for the sake of simplicity, I chose the smallest subset - match typed_value_field.data_type() { - DataType::Boolean - | DataType::Int32 - | DataType::Int64 - | DataType::Float32 - | DataType::Float64 - | DataType::BinaryView => {} - DataType::Union(union_fields, _) => { - union_fields - .iter() - .map(|(_, f)| f.clone()) - .try_for_each(|f| { - let DataType::Struct(fields) = f.data_type().clone() else { - return Err(ArrowError::InvalidArgumentError( - "Expected struct".to_string(), - )); - }; - - validate_value_and_typed_value(&fields, false) - })?; + /// Both `value` and `typed_value` are optional fields used together to encode a single value. + /// + /// Values in the two fields must be interpreted according to the following table: + /// + /// | `value` | `typed_value` | Meaning | + /// |----------|---------------|-------------------------------------------------------------| + /// | null | null | The value is missing; only valid for shredded object fields | + /// | non-null | null | The value is present and may be any type, including null | + /// | null | non-null | The value is present and is the shredded type | + /// | non-null | non-null | The value is present and is a partially shredded object | + fn validate_value_and_typed_value( + fields: &Fields, + inside_shredded_object: bool, + ) -> Result { + let value_field_res = fields.iter().enumerate().find(|(_, f)| f.name() == VALUE); + let typed_value_field_res = fields + .iter() + .enumerate() + .find(|(_, f)| f.name() == TYPED_VALUE); + + // validate types + if let Some((_, value_field)) = value_field_res { + if value_field.data_type() != &DataType::BinaryView { + return Err(ArrowError::NotYetImplemented(format!( + "VariantArray 'value' field must be BinaryView, got {}", + value_field.data_type() + ))); } + } - foreign => { - return Err(ArrowError::NotYetImplemented(format!( - "Unsupported VariantArray 'typed_value' field, got {foreign}" - ))) + if let Some((_, typed_value_field)) = typed_value_field_res { + match typed_value_field.data_type() { + DataType::Boolean + | DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::Float32 + | DataType::Float64 + | DataType::Decimal32(_, _) + | DataType::Decimal64(_, _) + | DataType::Decimal128(_, _) + | DataType::Date32 + | DataType::Date64 + | DataType::Time32(_) + | DataType::Time64(_) + | DataType::Timestamp(_, _) + | DataType::Utf8View + | DataType::BinaryView + | DataType::ListView(_) + | DataType::Struct(_) => {} + foreign => { + return Err(ArrowError::NotYetImplemented(format!( + "Unsupported VariantArray 'typed_value' field, got {foreign}" + ))) + } + } + } + + match (value_field_res, typed_value_field_res) { + (None, None) => { + if inside_shredded_object { + return Ok(ValueSchema::MissingValue); + } + + Err(ArrowError::InvalidArgumentError("Invalid VariantArray: StructArray must contain either `value` or `typed_value` fields or both.".to_string())) + } + (Some(value_field), None) => Ok(ValueSchema::Value(value_field.0)), + (None, Some(shredded_field)) => Ok(ValueSchema::ShreddedValue(shredded_field.0)), + (Some(_value_field), Some(_shredded_field)) => { + todo!("how does a shredded value look like?"); + // ideally here, i would unpack the shredded_field + // and recursively call validate_value_and_typed_value with inside_shredded_object set to true } } } - Ok(()) -} + pub fn try_new(fields: Fields) -> Result { + let metadata_idx = Self::validate_metadata(&fields)?; + let value_schema = Self::validate_value_and_typed_value(&fields, false)?; + + Ok(Self { + inner: fields.clone(), + metadata_idx, + value_schema, + }) + } -/// Validates that the provided [`Fields`] conform to the Variant shredding specification. -/// -/// # Requirements -/// - Must contain a "metadata" field of type BinaryView -/// - Must contain at least one of "value" (optional BinaryView) or "typed_value" (optional with valid Parquet type) -/// - Both "value" and "typed_value" can only be null simultaneously for shredded object fields -pub fn validate_shredded_schema(fields: &Fields) -> Result<(), ArrowError> { - let metadata_field = fields - .iter() - .find(|f| f.name() == METADATA) - .ok_or_else(|| { - ArrowError::InvalidArgumentError( - "Invalid VariantArray: StructArray must contain a 'metadata' field".to_string(), - ) - })?; - - if metadata_field.is_nullable() { - return Err(ArrowError::InvalidArgumentError( - "Invalid VariantArray: metadata field can not be nullable".to_string(), - )); + pub fn inner(self) -> Fields { + self.inner } - if metadata_field.data_type() != &DataType::BinaryView { - return Err(ArrowError::NotYetImplemented(format!( - "VariantArray 'metadata' field must be BinaryView, got {}", - metadata_field.data_type() - ))); + pub fn metadata(&self) -> &FieldRef { + self.inner.get(self.metadata_idx).unwrap() } - validate_value_and_typed_value(fields, false)?; + pub fn value(&self) -> Option<&FieldRef> { + match self.value_schema { + ValueSchema::MissingValue => None, + ValueSchema::ShreddedValue(_) => None, + ValueSchema::Value(value_idx) => self.inner.get(value_idx), + ValueSchema::PartiallyShredded { value_idx, .. } => self.inner.get(value_idx), + } + } - Ok(()) + pub fn shredded_value(&self) -> Option<&FieldRef> { + match self.value_schema { + ValueSchema::MissingValue => None, + ValueSchema::Value(_) => None, + ValueSchema::ShreddedValue(shredded_idx) => self.inner.get(shredded_idx), + ValueSchema::PartiallyShredded { + shredded_value_idx, .. + } => self.inner.get(shredded_value_idx), + } + } } #[cfg(test)] mod tests { use super::*; - use arrow_schema::{Field, UnionFields, UnionMode}; + use arrow_schema::Field; #[test] - fn test_regular_variant_schema() { + fn test_unshredded_variant_schema() { // a regular variant schema let metadata_field = Field::new("metadata", DataType::BinaryView, false); let value_field = Field::new("value", DataType::BinaryView, false); - let schema = Fields::from(vec![metadata_field, value_field]); + let fields = Fields::from(vec![metadata_field, value_field]); + let variant_schema = VariantSchema::try_new(fields).unwrap(); - validate_shredded_schema(&schema).unwrap(); + assert_eq!(variant_schema.metadata_idx, 0); + assert_eq!(variant_schema.value_schema, ValueSchema::Value(1)); } #[test] - fn test_regular_variant_schema_order_agnostic() { - // a regular variant schema + fn test_unshredded_variant_schema_order_agnostic() { let metadata_field = Field::new("metadata", DataType::BinaryView, false); let value_field = Field::new("value", DataType::BinaryView, false); - let schema = Fields::from(vec![value_field, metadata_field]); // note the order switch + let fields = Fields::from(vec![value_field, metadata_field]); // note the order switch + let variant_schema = VariantSchema::try_new(fields).unwrap(); - validate_shredded_schema(&schema).unwrap(); + assert_eq!(variant_schema.value_schema, ValueSchema::Value(0)); + assert_eq!(variant_schema.metadata_idx, 1); } #[test] - fn test_regular_variant_schema_allow_other_columns() { - // a regular variant schema + fn test_shredded_variant_schema() { let metadata_field = Field::new("metadata", DataType::BinaryView, false); - let value_field = Field::new("value", DataType::BinaryView, false); - - let trace_field = Field::new("trace_id", DataType::Utf8View, false); - let created_at_field = Field::new("created_at", DataType::Date64, false); + let shredded_field = Field::new("typed_value", DataType::Int8, true); - let schema = Fields::from(vec![ - metadata_field, - trace_field, - created_at_field, - value_field, - ]); + let fields = Fields::from(vec![metadata_field, shredded_field]); + let variant_schema = VariantSchema::try_new(fields).unwrap(); - validate_shredded_schema(&schema).unwrap(); + assert_eq!(variant_schema.metadata_idx, 0); + assert_eq!(variant_schema.value_schema, ValueSchema::ShreddedValue(1)); } #[test] @@ -182,7 +242,7 @@ mod tests { let value_field = Field::new("value", DataType::BinaryView, false); let schema = Fields::from(vec![value_field]); - let err = validate_shredded_schema(&schema).unwrap_err(); + let err = VariantSchema::try_new(schema).unwrap_err(); assert_eq!( err.to_string(), @@ -197,154 +257,11 @@ mod tests { let schema = Fields::from(vec![metadata_field, value_field]); - let err = validate_shredded_schema(&schema).unwrap_err(); + let err = VariantSchema::try_new(schema).unwrap_err(); assert_eq!( err.to_string(), "Invalid argument error: Invalid VariantArray: metadata field can not be nullable" ); } - - #[test] - fn test_regular_variant_schema_allow_nullable_value() { - // a regular variant schema - let metadata_field = Field::new("metadata", DataType::BinaryView, false); - let value_field = Field::new("value", DataType::BinaryView, true); - - let schema = Fields::from(vec![metadata_field, value_field]); - - validate_shredded_schema(&schema).unwrap(); - } - - #[test] - fn test_shredded_variant_schema() { - let metadata_field = Field::new("metadata", DataType::BinaryView, false); - let typed_value_field = Field::new("typed_value", DataType::Int64, false); - let schema = Fields::from(vec![metadata_field, typed_value_field]); - - validate_shredded_schema(&schema).unwrap(); - } - - #[test] - fn test_partially_shredded_variant_schema() { - let metadata_field = Field::new("metadata", DataType::BinaryView, false); - let value_field = Field::new("value", DataType::BinaryView, false); - let typed_value_field = Field::new("typed_value", DataType::Int64, false); - let schema = Fields::from(vec![metadata_field, value_field, typed_value_field]); - - validate_shredded_schema(&schema).unwrap(); - } - - #[test] - fn test_partially_shredded_variant_list_schema() { - /* - optional group tags (VARIANT) { - required binary metadata; - optional binary value; - optional group typed_value (LIST) { # must be optional to allow a null list - repeated group list { - required group element { # shredded element - optional binary value; - optional binary typed_value (STRING); - } - required group element { # shredded element - optional binary value; - optional int64 typed_value ; - } - } - } - } - */ - - let metadata_field = Field::new("metadata", DataType::BinaryView, false); - let value_field = Field::new("value", DataType::BinaryView, false); - - // Define union fields for different element types - let string_element = { - let value_field = Field::new("value", DataType::BinaryView, true); - let typed_value = Field::new("typed_value", DataType::BinaryView, true); - DataType::Struct(Fields::from(vec![value_field, typed_value])) - }; - - let int_element = { - let value_field = Field::new("value", DataType::BinaryView, true); - let typed_value = Field::new("typed_value", DataType::Int64, true); - DataType::Struct(Fields::from(vec![value_field, typed_value])) - }; - - // Create union of different element types - let union_fields = UnionFields::new( - vec![0, 1], - vec![ - Field::new("string_element", string_element, true), - Field::new("int_element", int_element, true), - ], - ); - - let typed_value_field = Field::new( - "typed_value", - DataType::Union(union_fields, UnionMode::Sparse), - false, - ); - let schema = Fields::from(vec![metadata_field, value_field, typed_value_field]); - - validate_shredded_schema(&schema).unwrap(); - } - - #[test] - fn test_partially_shredded_variant_object_schema() { - /* - optional group event (VARIANT) { - required binary metadata; - optional binary value; # a variant, expected to be an object - optional group typed_value { # shredded fields for the variant object - required group event_type { # shredded field for event_type - optional binary value; - optional binary typed_value (STRING); - } - required group event_ts { # shredded field for event_ts - optional binary value; - optional int64 typed_value (TIMESTAMP(true, MICROS)); - } - } - } - */ - - let metadata_field = Field::new("metadata", DataType::BinaryView, false); - let value_field = Field::new("value", DataType::BinaryView, false); - - // event_type - let element_group_1 = { - let value_field = Field::new("value", DataType::BinaryView, false); - let typed_value = Field::new("typed_value", DataType::BinaryView, false); // this is the string case - - Fields::from(vec![value_field, typed_value]) - }; - - // event_ts - let element_group_2 = { - let value_field = Field::new("value", DataType::BinaryView, false); - let typed_value = Field::new("typed_value", DataType::Int64, false); - - Fields::from(vec![value_field, typed_value]) - }; - - let typed_value_field = Field::new( - "typed_value", - DataType::Union( - UnionFields::new( - vec![0, 1], - vec![ - Field::new("event_type", DataType::Struct(element_group_1), true), - Field::new("event_ts", DataType::Struct(element_group_2), true), - ], - ), - UnionMode::Sparse, - ), - false, - ); - let schema = Fields::from(vec![metadata_field, value_field, typed_value_field]); - - validate_shredded_schema(&schema).unwrap(); - } } diff --git a/parquet-variant-compute/src/variant_array.rs b/parquet-variant-compute/src/variant_array.rs index 4f21d52a9e4c..5fa81382c096 100644 --- a/parquet-variant-compute/src/variant_array.rs +++ b/parquet-variant-compute/src/variant_array.rs @@ -24,8 +24,6 @@ use parquet_variant::Variant; use std::any::Any; use std::sync::Arc; -use crate::shredding::validate_shredded_schema; - /// An array of Parquet [`Variant`] values /// /// A [`VariantArray`] wraps an Arrow [`StructArray`] that stores the underlying @@ -96,8 +94,6 @@ impl VariantArray { )); }; - validate_shredded_schema(inner.fields())?; - // todo, remove this since we already do it in validate_shredded_schema let Some(metadata_field) = VariantArray::find_metadata_field(inner) else { return Err(ArrowError::InvalidArgumentError( diff --git a/parquet-variant-compute/src/variant_array_builder.rs b/parquet-variant-compute/src/variant_array_builder.rs index 2e4f3130c56b..eec0d8eb2215 100644 --- a/parquet-variant-compute/src/variant_array_builder.rs +++ b/parquet-variant-compute/src/variant_array_builder.rs @@ -17,7 +17,7 @@ //! [`VariantArrayBuilder`] implementation -use crate::{shredding::validate_shredded_schema, VariantArray}; +use crate::{shredding::VariantSchema, VariantArray}; use arrow::array::{ArrayRef, BinaryViewArray, BinaryViewBuilder, NullBufferBuilder, StructArray}; use arrow_schema::{ArrowError, DataType, Field, Fields}; use parquet_variant::{ListBuilder, ObjectBuilder, Variant, VariantBuilder, VariantBuilderExt}; @@ -87,23 +87,18 @@ pub struct VariantArrayBuilder { /// (offset, len) pairs for locations of values in the buffer value_locations: Vec<(usize, usize)>, /// The fields of the final `StructArray` - /// - /// TODO: 1) Add extension type metadata - /// TODO: 2) Add support for shredding - fields: Fields, + schema: VariantSchema, } impl VariantArrayBuilder { pub fn try_new(row_capacity: usize, schema: Fields) -> Result { - validate_shredded_schema(&schema)?; - Ok(Self { nulls: NullBufferBuilder::new(row_capacity), metadata_buffer: Vec::new(), // todo allocation capacity metadata_locations: Vec::with_capacity(row_capacity), value_buffer: Vec::new(), value_locations: Vec::with_capacity(row_capacity), - fields: schema, + schema: VariantSchema::try_new(schema)?, }) } @@ -115,7 +110,7 @@ impl VariantArrayBuilder { metadata_locations, value_buffer, value_locations, - fields, + schema, } = self; let metadata_array = binary_view_array_from_buffers(metadata_buffer, metadata_locations); @@ -124,7 +119,7 @@ impl VariantArrayBuilder { // The build the final struct array let inner = StructArray::new( - fields, + schema.inner(), vec![ Arc::new(metadata_array) as ArrayRef, Arc::new(value_array) as ArrayRef, From cfe694862c5d36af1e4a3cdea0373758a9541bb6 Mon Sep 17 00:00:00 2001 From: Matthew Kim <38759997+friendlymatthew@users.noreply.github.com> Date: Sun, 20 Jul 2025 14:08:43 +0200 Subject: [PATCH 3/4] Initial plumbing into variant_array and builder --- .../benches/variant_kernels.rs | 13 +++- parquet-variant-compute/src/from_json.rs | 3 +- parquet-variant-compute/src/shredding.rs | 37 ++++++++--- parquet-variant-compute/src/variant_array.rs | 66 ++++++++----------- .../src/variant_array_builder.rs | 7 +- parquet-variant-compute/src/variant_get.rs | 2 +- 6 files changed, 72 insertions(+), 56 deletions(-) diff --git a/parquet-variant-compute/benches/variant_kernels.rs b/parquet-variant-compute/benches/variant_kernels.rs index 8fd6af333fed..4f9624a536b6 100644 --- a/parquet-variant-compute/benches/variant_kernels.rs +++ b/parquet-variant-compute/benches/variant_kernels.rs @@ -17,6 +17,7 @@ use arrow::array::{Array, ArrayRef, StringArray}; use arrow::util::test_util::seedable_rng; +use arrow_schema::{DataType, Field, Fields}; use criterion::{criterion_group, criterion_main, Criterion}; use parquet_variant::{Variant, VariantBuilder}; use parquet_variant_compute::variant_get::{variant_get, GetOptions}; @@ -27,6 +28,14 @@ use rand::Rng; use rand::SeedableRng; use std::fmt::Write; use std::sync::Arc; + +fn unshredded_schema_fields() -> Fields { + let metadata_field = Field::new("metadata", DataType::BinaryView, false); + let value_field = Field::new("value", DataType::BinaryView, false); + + Fields::from(vec![metadata_field, value_field]) +} + fn benchmark_batch_json_string_to_variant(c: &mut Criterion) { let input_array = StringArray::from_iter_values(json_repeated_struct(8000)); let array_ref: ArrayRef = Arc::new(input_array); @@ -93,7 +102,7 @@ pub fn variant_get_bench(c: &mut Criterion) { }; c.bench_function("variant_get_primitive", |b| { - b.iter(|| variant_get(&input.clone(), options.clone())) + b.iter(|| variant_get(&input.clone(), options.clone(), unshredded_schema_fields())) }); } @@ -108,7 +117,7 @@ criterion_main!(benches); fn create_primitive_variant_array(size: usize) -> VariantArray { let mut rng = StdRng::seed_from_u64(42); - let mut variant_builder = VariantArrayBuilder::new(1); + let mut variant_builder = VariantArrayBuilder::try_new(1, unshredded_schema_fields()).unwrap(); for _ in 0..size { let mut builder = VariantBuilder::new(); diff --git a/parquet-variant-compute/src/from_json.rs b/parquet-variant-compute/src/from_json.rs index efca4f0c9b3c..7f81bc2a9985 100644 --- a/parquet-variant-compute/src/from_json.rs +++ b/parquet-variant-compute/src/from_json.rs @@ -21,7 +21,6 @@ use crate::{VariantArray, VariantArrayBuilder}; use arrow::array::{Array, ArrayRef, StringArray}; use arrow_schema::{ArrowError, DataType, Field, Fields}; -use parquet_variant::VariantBuilder; use parquet_variant_json::json_to_variant; /// Parse a batch of JSON strings into a batch of Variants represented as @@ -78,7 +77,7 @@ mod test { let variant_array = batch_json_string_to_variant(&array_ref).unwrap(); let metadata_array = variant_array.metadata_field().as_binary_view(); - let value_array = variant_array.value_field().as_binary_view(); + let value_array = variant_array.value_field().unwrap().as_binary_view(); // Compare row 0 assert!(!variant_array.is_null(0)); diff --git a/parquet-variant-compute/src/shredding.rs b/parquet-variant-compute/src/shredding.rs index 68a1a53867cf..d3fc4fe8da15 100644 --- a/parquet-variant-compute/src/shredding.rs +++ b/parquet-variant-compute/src/shredding.rs @@ -22,7 +22,7 @@ pub const METADATA: &str = "metadata"; pub const VALUE: &str = "value"; pub const TYPED_VALUE: &str = "typed_value"; -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub enum ValueSchema { MissingValue, Value(usize), @@ -33,7 +33,7 @@ pub enum ValueSchema { }, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct VariantSchema { inner: Fields, @@ -54,7 +54,7 @@ impl VariantSchema { let (metadata_idx, metadata_field) = fields .iter() .enumerate() - .find(|(i, f)| f.name() == METADATA) + .find(|(_, f)| f.name() == METADATA) .ok_or_else(|| { ArrowError::InvalidArgumentError( "Invalid VariantArray: StructArray must contain a 'metadata' field".to_string(), @@ -165,33 +165,50 @@ impl VariantSchema { }) } - pub fn inner(self) -> Fields { + pub fn inner(&self) -> &Fields { + &self.inner + } + + pub fn into_inner(self) -> Fields { self.inner } + pub fn metadata_idx(&self) -> usize { + self.metadata_idx + } + pub fn metadata(&self) -> &FieldRef { self.inner.get(self.metadata_idx).unwrap() } - pub fn value(&self) -> Option<&FieldRef> { + pub fn value_idx(&self) -> Option { match self.value_schema { ValueSchema::MissingValue => None, ValueSchema::ShreddedValue(_) => None, - ValueSchema::Value(value_idx) => self.inner.get(value_idx), - ValueSchema::PartiallyShredded { value_idx, .. } => self.inner.get(value_idx), + ValueSchema::Value(value_idx) => Some(value_idx), + ValueSchema::PartiallyShredded { value_idx, .. } => Some(value_idx), } } - pub fn shredded_value(&self) -> Option<&FieldRef> { + pub fn value(&self) -> Option<&FieldRef> { + self.value_idx().map(|i| self.inner.get(i).unwrap()) + } + + pub fn shredded_value_idx(&self) -> Option { match self.value_schema { ValueSchema::MissingValue => None, ValueSchema::Value(_) => None, - ValueSchema::ShreddedValue(shredded_idx) => self.inner.get(shredded_idx), + ValueSchema::ShreddedValue(shredded_idx) => Some(shredded_idx), ValueSchema::PartiallyShredded { shredded_value_idx, .. - } => self.inner.get(shredded_value_idx), + } => Some(shredded_value_idx), } } + + pub fn shredded_value(&self) -> Option<&FieldRef> { + self.shredded_value_idx() + .map(|i| self.inner.get(i).unwrap()) + } } #[cfg(test)] diff --git a/parquet-variant-compute/src/variant_array.rs b/parquet-variant-compute/src/variant_array.rs index 5fa81382c096..8488d3adea96 100644 --- a/parquet-variant-compute/src/variant_array.rs +++ b/parquet-variant-compute/src/variant_array.rs @@ -24,6 +24,8 @@ use parquet_variant::Variant; use std::any::Any; use std::sync::Arc; +use crate::shredding::VariantSchema; + /// An array of Parquet [`Variant`] values /// /// A [`VariantArray`] wraps an Arrow [`StructArray`] that stores the underlying @@ -60,11 +62,7 @@ pub struct VariantArray { /// int8. inner: StructArray, - /// Reference to the metadata column of inner - metadata_ref: ArrayRef, - - /// Reference to the value column of inner - value_ref: ArrayRef, + variant_schema: VariantSchema, } impl VariantArray { @@ -94,23 +92,11 @@ impl VariantArray { )); }; - // todo, remove this since we already do it in validate_shredded_schema - let Some(metadata_field) = VariantArray::find_metadata_field(inner) else { - return Err(ArrowError::InvalidArgumentError( - "Invalid VariantArray: StructArray must contain a 'metadata' field".to_string(), - )); - }; - - let Some(value_field) = VariantArray::find_value_field(inner) else { - return Err(ArrowError::InvalidArgumentError( - "Invalid VariantArray: StructArray must contain a 'value' field".to_string(), - )); - }; + let variant_schema = VariantSchema::try_new(inner.fields().clone())?; Ok(Self { inner: inner.clone(), - metadata_ref: metadata_field, - value_ref: value_field, + variant_schema, }) } @@ -126,34 +112,41 @@ impl VariantArray { /// Return the [`Variant`] instance stored at the given row /// - /// Panics if the index is out of bounds. + /// Panics if the index is out of bounds or value array does not exist. /// /// Note: Does not do deep validation of the [`Variant`], so it is up to the /// caller to ensure that the metadata and value were constructed correctly. + /// + /// Todo: reconstruct partially shredded or shredded variants pub fn value(&self, index: usize) -> Variant { let metadata = self.metadata_field().as_binary_view().value(index); - let value = self.value_field().as_binary_view().value(index); + let value = self + .value_field() + .expect("value field does not exist") + .as_binary_view() + .value(index); Variant::new(metadata, value) } - fn find_metadata_field(array: &StructArray) -> Option { - array.column_by_name("metadata").cloned() - } - - fn find_value_field(array: &StructArray) -> Option { - array.column_by_name("value").cloned() - } - /// Return a reference to the metadata field of the [`StructArray`] pub fn metadata_field(&self) -> &ArrayRef { - // spec says fields order is not guaranteed, so we search by name - &self.metadata_ref + let metadata_idx = self.variant_schema.metadata_idx(); + + self.inner.column(metadata_idx) } /// Return a reference to the value field of the `StructArray` - pub fn value_field(&self) -> &ArrayRef { - // spec says fields order is not guaranteed, so we search by name - &self.value_ref + pub fn value_field(&self) -> Option<&ArrayRef> { + self.variant_schema + .value_idx() + .map(|i| self.inner.column(i)) + } + + /// Return a reference to the shredded value field of the `StructArray` + pub fn shredded_value_field(&self) -> Option<&ArrayRef> { + self.variant_schema + .shredded_value_idx() + .map(|i| self.inner.column(i)) } } @@ -176,12 +169,9 @@ impl Array for VariantArray { fn slice(&self, offset: usize, length: usize) -> ArrayRef { let slice = self.inner.slice(offset, length); - let met = self.metadata_ref.slice(offset, length); - let val = self.value_ref.slice(offset, length); Arc::new(Self { inner: slice, - metadata_ref: met, - value_ref: val, + variant_schema: self.variant_schema.clone(), }) } diff --git a/parquet-variant-compute/src/variant_array_builder.rs b/parquet-variant-compute/src/variant_array_builder.rs index eec0d8eb2215..668f4d339f74 100644 --- a/parquet-variant-compute/src/variant_array_builder.rs +++ b/parquet-variant-compute/src/variant_array_builder.rs @@ -19,7 +19,7 @@ use crate::{shredding::VariantSchema, VariantArray}; use arrow::array::{ArrayRef, BinaryViewArray, BinaryViewBuilder, NullBufferBuilder, StructArray}; -use arrow_schema::{ArrowError, DataType, Field, Fields}; +use arrow_schema::{ArrowError, Fields}; use parquet_variant::{ListBuilder, ObjectBuilder, Variant, VariantBuilder, VariantBuilderExt}; use std::sync::Arc; @@ -119,7 +119,7 @@ impl VariantArrayBuilder { // The build the final struct array let inner = StructArray::new( - schema.inner(), + schema.into_inner(), vec![ Arc::new(metadata_array) as ArrayRef, Arc::new(value_array) as ArrayRef, @@ -358,6 +358,7 @@ fn binary_view_array_from_buffers( mod test { use super::*; use arrow::array::Array; + use arrow_schema::{DataType, Field}; /// Test that both the metadata and value buffers are non nullable #[test] @@ -379,7 +380,7 @@ mod test { // the metadata and value fields of non shredded variants should not be null assert!(variant_array.metadata_field().nulls().is_none()); - assert!(variant_array.value_field().nulls().is_none()); + assert!(variant_array.value_field().unwrap().nulls().is_none()); let DataType::Struct(fields) = variant_array.data_type() else { panic!("Expected VariantArray to have Struct data type"); }; diff --git a/parquet-variant-compute/src/variant_get.rs b/parquet-variant-compute/src/variant_get.rs index 38f29d563936..33e33d9898f4 100644 --- a/parquet-variant-compute/src/variant_get.rs +++ b/parquet-variant-compute/src/variant_get.rs @@ -49,7 +49,7 @@ pub fn variant_get( ))); } - let mut builder = VariantArrayBuilder::try_new(variant_array.len(), shredded_schema).unwrap(); + let mut builder = VariantArrayBuilder::try_new(variant_array.len(), shredded_schema)?; for i in 0..variant_array.len() { let new_variant = variant_array.value(i); // TODO: perf? From a3692edc69558997d8cb9c1e35f92e256657ae0e Mon Sep 17 00:00:00 2001 From: Matthew Kim <38759997+friendlymatthew@users.noreply.github.com> Date: Wed, 23 Jul 2025 11:07:55 +0200 Subject: [PATCH 4/4] Recursively validate lists and objects --- parquet-variant-compute/src/shredding.rs | 36 ++++++++++++++++--- .../src/variant_array_builder.rs | 23 ++++++++++-- 2 files changed, 51 insertions(+), 8 deletions(-) diff --git a/parquet-variant-compute/src/shredding.rs b/parquet-variant-compute/src/shredding.rs index d3fc4fe8da15..d640db709322 100644 --- a/parquet-variant-compute/src/shredding.rs +++ b/parquet-variant-compute/src/shredding.rs @@ -127,7 +127,8 @@ impl VariantSchema { | DataType::Utf8View | DataType::BinaryView | DataType::ListView(_) - | DataType::Struct(_) => {} + | DataType::Struct(_) + | DataType::Dictionary(_, _) => {} foreign => { return Err(ArrowError::NotYetImplemented(format!( "Unsupported VariantArray 'typed_value' field, got {foreign}" @@ -146,10 +147,35 @@ impl VariantSchema { } (Some(value_field), None) => Ok(ValueSchema::Value(value_field.0)), (None, Some(shredded_field)) => Ok(ValueSchema::ShreddedValue(shredded_field.0)), - (Some(_value_field), Some(_shredded_field)) => { - todo!("how does a shredded value look like?"); - // ideally here, i would unpack the shredded_field - // and recursively call validate_value_and_typed_value with inside_shredded_object set to true + (Some((value_idx, _)), Some((shredded_value_idx, shredded_field))) => { + match shredded_field.data_type() { + DataType::Struct(fields) => { + let _ = Self::validate_value_and_typed_value(fields, false)?; + + Ok(ValueSchema::PartiallyShredded { + value_idx, + shredded_value_idx, + }) + } + DataType::Dictionary(_key, shredded_schema) => { + if let DataType::Struct(fields) = shredded_schema.as_ref() { + let _ = Self::validate_value_and_typed_value(fields, true)?; + + Ok(ValueSchema::PartiallyShredded { + value_idx, + shredded_value_idx, + }) + } else { + Err(ArrowError::InvalidArgumentError( + "Invalid VariantArray: shredded fields must be of struct or list types".to_string(), + )) + } + } + _ => Err(ArrowError::InvalidArgumentError( + "Invalid VariantArray: shredded fields must be of struct or list types" + .to_string(), + )), + } } } } diff --git a/parquet-variant-compute/src/variant_array_builder.rs b/parquet-variant-compute/src/variant_array_builder.rs index 668f4d339f74..58264e3d7b52 100644 --- a/parquet-variant-compute/src/variant_array_builder.rs +++ b/parquet-variant-compute/src/variant_array_builder.rs @@ -160,7 +160,13 @@ impl VariantArrayBuilder { /// ``` /// # use parquet_variant::{Variant, VariantBuilder, VariantBuilderExt}; /// # use parquet_variant_compute::{VariantArray, VariantArrayBuilder}; - /// let mut array_builder = VariantArrayBuilder::new(10); + /// # use arrow_schema::{Field, Fields, DataType}; + /// + /// let metadata_field = Field::new("metadata", DataType::BinaryView, false); + /// let value_field = Field::new("value", DataType::BinaryView, false); + /// let schema = Fields::from(vec![metadata_field, value_field]); + /// + /// let mut array_builder = VariantArrayBuilder::try_new(10, schema).unwrap(); /// /// // First row has a string /// let mut variant_builder = array_builder.variant_builder(); @@ -396,7 +402,13 @@ mod test { /// Test using sub builders to append variants #[test] fn test_variant_array_builder_variant_builder() { - let mut builder = VariantArrayBuilder::new(10); + let metadata_field = Field::new("metadata", DataType::BinaryView, false); + let value_field = Field::new("value", DataType::BinaryView, false); + + let schema = Fields::from(vec![metadata_field, value_field]); + + let mut builder = VariantArrayBuilder::try_new(10, schema).unwrap(); + builder.append_null(); // should not panic builder.append_variant(Variant::from(42i32)); @@ -436,7 +448,12 @@ mod test { /// Test using non-finished sub builders to append variants #[test] fn test_variant_array_builder_variant_builder_reset() { - let mut builder = VariantArrayBuilder::new(10); + let metadata_field = Field::new("metadata", DataType::BinaryView, false); + let value_field = Field::new("value", DataType::BinaryView, false); + + let schema = Fields::from(vec![metadata_field, value_field]); + + let mut builder = VariantArrayBuilder::try_new(10, schema).unwrap(); // make a sub-object in the first row let mut sub_builder = builder.variant_builder();