Skip to content

Commit debe856

Browse files
Validate schema
1 parent 82821e5 commit debe856

File tree

6 files changed

+410
-34
lines changed

6 files changed

+410
-34
lines changed

parquet-variant-compute/src/from_json.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
2121
use crate::{VariantArray, VariantArrayBuilder};
2222
use arrow::array::{Array, ArrayRef, StringArray};
23-
use arrow_schema::ArrowError;
23+
use arrow_schema::{ArrowError, DataType, Field, Fields};
2424
use parquet_variant::VariantBuilder;
2525
use parquet_variant_json::json_to_variant;
2626

@@ -35,7 +35,14 @@ pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result<VariantArray, Ar
3535
)),
3636
}?;
3737

38-
let mut variant_array_builder = VariantArrayBuilder::new(input_string_array.len());
38+
let metadata_field = Field::new("metadata", DataType::BinaryView, false);
39+
let value_field = Field::new("value", DataType::BinaryView, false);
40+
41+
let schema = Fields::from(vec![metadata_field, value_field]);
42+
43+
let mut variant_array_builder =
44+
VariantArrayBuilder::try_new(input_string_array.len(), schema).unwrap();
45+
3946
for i in 0..input.len() {
4047
if input.is_null(i) {
4148
// The subfields are expected to be non-nullable according to the parquet variant spec.

parquet-variant-compute/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
mod from_json;
19+
mod shredding;
1920
mod to_json;
2021
mod variant_array;
2122
mod variant_array_builder;
Lines changed: 350 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow_schema::{ArrowError, DataType, FieldRef, Fields};
19+
20+
// Keywords defined by the shredding spec
21+
pub const METADATA: &str = "metadata";
22+
pub const VALUE: &str = "value";
23+
pub const TYPED_VALUE: &str = "typed_value";
24+
25+
#[derive(Debug)]
26+
pub struct VariantSchema {
27+
metadata_ref: FieldRef,
28+
typed_value_ref: Option<FieldRef>,
29+
value_ref: Option<FieldRef>,
30+
}
31+
32+
impl VariantSchema {
33+
pub fn try_new(fields: Fields) -> Result<Self, ArrowError> {
34+
todo!()
35+
}
36+
}
37+
38+
pub fn validate_value_and_typed_value(
39+
fields: &Fields,
40+
allow_both_null: bool,
41+
) -> Result<(), ArrowError> {
42+
let value_field_res = fields.iter().find(|f| f.name() == VALUE);
43+
let typed_value_field_res = fields.iter().find(|f| f.name() == TYPED_VALUE);
44+
45+
if !allow_both_null {
46+
if let (None, None) = (value_field_res, typed_value_field_res) {
47+
return Err(ArrowError::InvalidArgumentError(
48+
"Invalid VariantArray: StructArray must contain either `value` or `typed_value` fields or both.".to_string()
49+
));
50+
}
51+
}
52+
53+
if let Some(value_field) = value_field_res {
54+
if value_field.data_type() != &DataType::BinaryView {
55+
return Err(ArrowError::NotYetImplemented(format!(
56+
"VariantArray 'value' field must be BinaryView, got {}",
57+
value_field.data_type()
58+
)));
59+
}
60+
}
61+
62+
if let Some(typed_value_field) = fields.iter().find(|f| f.name() == TYPED_VALUE) {
63+
// this is directly mapped from the spec's parquet physical types
64+
// note, there are more data types we can support
65+
// but for the sake of simplicity, I chose the smallest subset
66+
match typed_value_field.data_type() {
67+
DataType::Boolean
68+
| DataType::Int32
69+
| DataType::Int64
70+
| DataType::Float32
71+
| DataType::Float64
72+
| DataType::BinaryView => {}
73+
DataType::Union(union_fields, _) => {
74+
union_fields
75+
.iter()
76+
.map(|(_, f)| f.clone())
77+
.try_for_each(|f| {
78+
let DataType::Struct(fields) = f.data_type().clone() else {
79+
return Err(ArrowError::InvalidArgumentError(
80+
"Expected struct".to_string(),
81+
));
82+
};
83+
84+
validate_value_and_typed_value(&fields, false)
85+
})?;
86+
}
87+
88+
foreign => {
89+
return Err(ArrowError::NotYetImplemented(format!(
90+
"Unsupported VariantArray 'typed_value' field, got {foreign}"
91+
)))
92+
}
93+
}
94+
}
95+
96+
Ok(())
97+
}
98+
99+
/// Validates that the provided [`Fields`] conform to the Variant shredding specification.
100+
///
101+
/// # Requirements
102+
/// - Must contain a "metadata" field of type BinaryView
103+
/// - Must contain at least one of "value" (optional BinaryView) or "typed_value" (optional with valid Parquet type)
104+
/// - Both "value" and "typed_value" can only be null simultaneously for shredded object fields
105+
pub fn validate_shredded_schema(fields: &Fields) -> Result<(), ArrowError> {
106+
let metadata_field = fields
107+
.iter()
108+
.find(|f| f.name() == METADATA)
109+
.ok_or_else(|| {
110+
ArrowError::InvalidArgumentError(
111+
"Invalid VariantArray: StructArray must contain a 'metadata' field".to_string(),
112+
)
113+
})?;
114+
115+
if metadata_field.is_nullable() {
116+
return Err(ArrowError::InvalidArgumentError(
117+
"Invalid VariantArray: metadata field can not be nullable".to_string(),
118+
));
119+
}
120+
121+
if metadata_field.data_type() != &DataType::BinaryView {
122+
return Err(ArrowError::NotYetImplemented(format!(
123+
"VariantArray 'metadata' field must be BinaryView, got {}",
124+
metadata_field.data_type()
125+
)));
126+
}
127+
128+
validate_value_and_typed_value(fields, false)?;
129+
130+
Ok(())
131+
}
132+
133+
#[cfg(test)]
134+
mod tests {
135+
use super::*;
136+
137+
use arrow_schema::{Field, UnionFields, UnionMode};
138+
139+
#[test]
140+
fn test_regular_variant_schema() {
141+
// a regular variant schema
142+
let metadata_field = Field::new("metadata", DataType::BinaryView, false);
143+
let value_field = Field::new("value", DataType::BinaryView, false);
144+
145+
let schema = Fields::from(vec![metadata_field, value_field]);
146+
147+
validate_shredded_schema(&schema).unwrap();
148+
}
149+
150+
#[test]
151+
fn test_regular_variant_schema_order_agnostic() {
152+
// a regular variant schema
153+
let metadata_field = Field::new("metadata", DataType::BinaryView, false);
154+
let value_field = Field::new("value", DataType::BinaryView, false);
155+
156+
let schema = Fields::from(vec![value_field, metadata_field]); // note the order switch
157+
158+
validate_shredded_schema(&schema).unwrap();
159+
}
160+
161+
#[test]
162+
fn test_regular_variant_schema_allow_other_columns() {
163+
// a regular variant schema
164+
let metadata_field = Field::new("metadata", DataType::BinaryView, false);
165+
let value_field = Field::new("value", DataType::BinaryView, false);
166+
167+
let trace_field = Field::new("trace_id", DataType::Utf8View, false);
168+
let created_at_field = Field::new("created_at", DataType::Date64, false);
169+
170+
let schema = Fields::from(vec![
171+
metadata_field,
172+
trace_field,
173+
created_at_field,
174+
value_field,
175+
]);
176+
177+
validate_shredded_schema(&schema).unwrap();
178+
}
179+
180+
#[test]
181+
fn test_regular_variant_schema_missing_metadata() {
182+
let value_field = Field::new("value", DataType::BinaryView, false);
183+
let schema = Fields::from(vec![value_field]);
184+
185+
let err = validate_shredded_schema(&schema).unwrap_err();
186+
187+
assert_eq!(
188+
err.to_string(),
189+
"Invalid argument error: Invalid VariantArray: StructArray must contain a 'metadata' field"
190+
);
191+
}
192+
193+
#[test]
194+
fn test_regular_variant_schema_nullable_metadata() {
195+
let metadata_field = Field::new("metadata", DataType::BinaryView, true);
196+
let value_field = Field::new("value", DataType::BinaryView, false);
197+
198+
let schema = Fields::from(vec![metadata_field, value_field]);
199+
200+
let err = validate_shredded_schema(&schema).unwrap_err();
201+
202+
assert_eq!(
203+
err.to_string(),
204+
"Invalid argument error: Invalid VariantArray: metadata field can not be nullable"
205+
);
206+
}
207+
208+
#[test]
209+
fn test_regular_variant_schema_allow_nullable_value() {
210+
// a regular variant schema
211+
let metadata_field = Field::new("metadata", DataType::BinaryView, false);
212+
let value_field = Field::new("value", DataType::BinaryView, true);
213+
214+
let schema = Fields::from(vec![metadata_field, value_field]);
215+
216+
validate_shredded_schema(&schema).unwrap();
217+
}
218+
219+
#[test]
220+
fn test_shredded_variant_schema() {
221+
let metadata_field = Field::new("metadata", DataType::BinaryView, false);
222+
let typed_value_field = Field::new("typed_value", DataType::Int64, false);
223+
let schema = Fields::from(vec![metadata_field, typed_value_field]);
224+
225+
validate_shredded_schema(&schema).unwrap();
226+
}
227+
228+
#[test]
229+
fn test_partially_shredded_variant_schema() {
230+
let metadata_field = Field::new("metadata", DataType::BinaryView, false);
231+
let value_field = Field::new("value", DataType::BinaryView, false);
232+
let typed_value_field = Field::new("typed_value", DataType::Int64, false);
233+
let schema = Fields::from(vec![metadata_field, value_field, typed_value_field]);
234+
235+
validate_shredded_schema(&schema).unwrap();
236+
}
237+
238+
#[test]
239+
fn test_partially_shredded_variant_list_schema() {
240+
/*
241+
optional group tags (VARIANT) {
242+
required binary metadata;
243+
optional binary value;
244+
optional group typed_value (LIST) { # must be optional to allow a null list
245+
repeated group list {
246+
required group element { # shredded element
247+
optional binary value;
248+
optional binary typed_value (STRING);
249+
}
250+
required group element { # shredded element
251+
optional binary value;
252+
optional int64 typed_value ;
253+
}
254+
}
255+
}
256+
}
257+
*/
258+
259+
let metadata_field = Field::new("metadata", DataType::BinaryView, false);
260+
let value_field = Field::new("value", DataType::BinaryView, false);
261+
262+
// Define union fields for different element types
263+
let string_element = {
264+
let value_field = Field::new("value", DataType::BinaryView, true);
265+
let typed_value = Field::new("typed_value", DataType::BinaryView, true);
266+
DataType::Struct(Fields::from(vec![value_field, typed_value]))
267+
};
268+
269+
let int_element = {
270+
let value_field = Field::new("value", DataType::BinaryView, true);
271+
let typed_value = Field::new("typed_value", DataType::Int64, true);
272+
DataType::Struct(Fields::from(vec![value_field, typed_value]))
273+
};
274+
275+
// Create union of different element types
276+
let union_fields = UnionFields::new(
277+
vec![0, 1],
278+
vec![
279+
Field::new("string_element", string_element, true),
280+
Field::new("int_element", int_element, true),
281+
],
282+
);
283+
284+
let typed_value_field = Field::new(
285+
"typed_value",
286+
DataType::Union(union_fields, UnionMode::Sparse),
287+
false,
288+
);
289+
let schema = Fields::from(vec![metadata_field, value_field, typed_value_field]);
290+
291+
validate_shredded_schema(&schema).unwrap();
292+
}
293+
294+
#[test]
295+
fn test_partially_shredded_variant_object_schema() {
296+
/*
297+
optional group event (VARIANT) {
298+
required binary metadata;
299+
optional binary value; # a variant, expected to be an object
300+
optional group typed_value { # shredded fields for the variant object
301+
required group event_type { # shredded field for event_type
302+
optional binary value;
303+
optional binary typed_value (STRING);
304+
}
305+
required group event_ts { # shredded field for event_ts
306+
optional binary value;
307+
optional int64 typed_value (TIMESTAMP(true, MICROS));
308+
}
309+
}
310+
}
311+
*/
312+
313+
let metadata_field = Field::new("metadata", DataType::BinaryView, false);
314+
let value_field = Field::new("value", DataType::BinaryView, false);
315+
316+
// event_type
317+
let element_group_1 = {
318+
let value_field = Field::new("value", DataType::BinaryView, false);
319+
let typed_value = Field::new("typed_value", DataType::BinaryView, false); // this is the string case
320+
321+
Fields::from(vec![value_field, typed_value])
322+
};
323+
324+
// event_ts
325+
let element_group_2 = {
326+
let value_field = Field::new("value", DataType::BinaryView, false);
327+
let typed_value = Field::new("typed_value", DataType::Int64, false);
328+
329+
Fields::from(vec![value_field, typed_value])
330+
};
331+
332+
let typed_value_field = Field::new(
333+
"typed_value",
334+
DataType::Union(
335+
UnionFields::new(
336+
vec![0, 1],
337+
vec![
338+
Field::new("event_type", DataType::Struct(element_group_1), true),
339+
Field::new("event_ts", DataType::Struct(element_group_2), true),
340+
],
341+
),
342+
UnionMode::Sparse,
343+
),
344+
false,
345+
);
346+
let schema = Fields::from(vec![metadata_field, value_field, typed_value_field]);
347+
348+
validate_shredded_schema(&schema).unwrap();
349+
}
350+
}

0 commit comments

Comments
 (0)