Skip to content

Commit 79248ae

Browse files
Validate schema
1 parent 82821e5 commit 79248ae

File tree

5 files changed

+385
-39
lines changed

5 files changed

+385
-39
lines changed

parquet-variant-compute/src/from_json.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
2121
use crate::{VariantArray, VariantArrayBuilder};
2222
use arrow::array::{Array, ArrayRef, StringArray};
23-
use arrow_schema::ArrowError;
23+
use arrow_schema::{ArrowError, DataType, Field, Fields};
2424
use parquet_variant::VariantBuilder;
2525
use parquet_variant_json::json_to_variant;
2626

@@ -35,7 +35,14 @@ pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result<VariantArray, Ar
3535
)),
3636
}?;
3737

38-
let mut variant_array_builder = VariantArrayBuilder::new(input_string_array.len());
38+
let metadata_field = Field::new("metadata", DataType::BinaryView, false);
39+
let value_field = Field::new("value", DataType::BinaryView, false);
40+
41+
let schema = Fields::from(vec![metadata_field, value_field]);
42+
43+
let mut variant_array_builder =
44+
VariantArrayBuilder::try_new(input_string_array.len(), schema).unwrap();
45+
3946
for i in 0..input.len() {
4047
if input.is_null(i) {
4148
// The subfields are expected to be non-nullable according to the parquet variant spec.

parquet-variant-compute/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
mod from_json;
19+
mod shredding;
1920
mod to_json;
2021
mod variant_array;
2122
mod variant_array_builder;
Lines changed: 349 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,349 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow_schema::{ArrowError, DataType, Fields};
19+
20+
// Keywords defined by the shredding spec
21+
pub const METADATA: &str = "metadata";
22+
pub const VALUE: &str = "value";
23+
pub const TYPED_VALUE: &str = "typed_value";
24+
25+
pub fn validate_value_and_typed_value(
26+
fields: &Fields,
27+
allow_both_null: bool,
28+
) -> Result<(), ArrowError> {
29+
let value_field_res = fields.iter().find(|f| f.name() == VALUE);
30+
let typed_value_field_res = fields.iter().find(|f| f.name() == TYPED_VALUE);
31+
32+
if !allow_both_null {
33+
if let (None, None) = (value_field_res, typed_value_field_res) {
34+
return Err(ArrowError::InvalidArgumentError(
35+
"Invalid VariantArray: StructArray must contain either `value` or `typed_value` fields or both.".to_string()
36+
));
37+
}
38+
}
39+
40+
if let Some(value_field) = value_field_res {
41+
// if !value_field.is_nullable() {
42+
// return Err(ArrowError::InvalidArgumentError(
43+
// "Expected value field to be nullable".to_string(),
44+
// ));
45+
// }
46+
47+
if value_field.data_type() != &DataType::BinaryView {
48+
return Err(ArrowError::NotYetImplemented(format!(
49+
"VariantArray 'value' field must be BinaryView, got {}",
50+
value_field.data_type()
51+
)));
52+
}
53+
}
54+
55+
if let Some(typed_value_field) = fields.iter().find(|f| f.name() == TYPED_VALUE) {
56+
// if !typed_value_field.is_nullable() {
57+
// return Err(ArrowError::InvalidArgumentError(
58+
// "Expected value field to be nullable".to_string(),
59+
// ));
60+
// }
61+
62+
// this is directly mapped from the spec's parquet physical types
63+
// note, there are more data types we can support
64+
// but for the sake of simplicity, I chose the smallest subset
65+
match typed_value_field.data_type() {
66+
DataType::Boolean
67+
| DataType::Int32
68+
| DataType::Int64
69+
| DataType::Float32
70+
| DataType::Float64
71+
| DataType::BinaryView => {}
72+
DataType::Union(union_fields, _) => {
73+
union_fields
74+
.iter()
75+
.map(|(_, f)| f.clone())
76+
.try_for_each(|f| {
77+
let DataType::Struct(fields) = f.data_type().clone() else {
78+
return Err(ArrowError::InvalidArgumentError(
79+
"Expected struct".to_string(),
80+
));
81+
};
82+
83+
validate_value_and_typed_value(&fields, false)
84+
})?;
85+
}
86+
87+
foreign => {
88+
return Err(ArrowError::NotYetImplemented(format!(
89+
"Unsupported VariantArray 'typed_value' field, got {foreign}"
90+
)))
91+
}
92+
}
93+
}
94+
95+
Ok(())
96+
}
97+
98+
/// Validates that the provided [`Fields`] conform to the Variant shredding specification.
99+
///
100+
/// # Requirements
101+
/// - Must contain a "metadata" field of type BinaryView
102+
/// - Must contain at least one of "value" (optional BinaryView) or "typed_value" (optional with valid Parquet type)
103+
/// - Both "value" and "typed_value" can only be null simultaneously for shredded object fields
104+
pub fn validate_shredded_schema(fields: &Fields) -> Result<(), ArrowError> {
105+
let metadata_field = fields
106+
.iter()
107+
.find(|f| f.name() == METADATA)
108+
.ok_or_else(|| {
109+
ArrowError::InvalidArgumentError(
110+
"Invalid VariantArray: StructArray must contain a 'metadata' field".to_string(),
111+
)
112+
})?;
113+
114+
if metadata_field.is_nullable() {
115+
return Err(ArrowError::InvalidArgumentError(
116+
"Invalid VariantArray: metadata field can not be nullable".to_string(),
117+
));
118+
}
119+
120+
if metadata_field.data_type() != &DataType::BinaryView {
121+
return Err(ArrowError::NotYetImplemented(format!(
122+
"VariantArray 'metadata' field must be BinaryView, got {}",
123+
metadata_field.data_type()
124+
)));
125+
}
126+
127+
validate_value_and_typed_value(fields, false)?;
128+
129+
Ok(())
130+
}
131+
132+
#[cfg(test)]
133+
mod tests {
134+
use super::*;
135+
136+
use arrow_schema::{Field, UnionFields, UnionMode};
137+
138+
#[test]
139+
fn test_regular_variant_schema() {
140+
// a regular variant schema
141+
let metadata_field = Field::new("metadata", DataType::BinaryView, false);
142+
let value_field = Field::new("value", DataType::BinaryView, false);
143+
144+
let schema = Fields::from(vec![metadata_field, value_field]);
145+
146+
validate_shredded_schema(&schema).unwrap();
147+
}
148+
149+
#[test]
150+
fn test_regular_variant_schema_order_agnostic() {
151+
// a regular variant schema
152+
let metadata_field = Field::new("metadata", DataType::BinaryView, false);
153+
let value_field = Field::new("value", DataType::BinaryView, false);
154+
155+
let schema = Fields::from(vec![value_field, metadata_field]); // note the order switch
156+
157+
validate_shredded_schema(&schema).unwrap();
158+
}
159+
160+
#[test]
161+
fn test_regular_variant_schema_allow_other_columns() {
162+
// a regular variant schema
163+
let metadata_field = Field::new("metadata", DataType::BinaryView, false);
164+
let value_field = Field::new("value", DataType::BinaryView, false);
165+
166+
let trace_field = Field::new("trace_id", DataType::Utf8View, false);
167+
let created_at_field = Field::new("created_at", DataType::Date64, false);
168+
169+
let schema = Fields::from(vec![
170+
metadata_field,
171+
trace_field,
172+
created_at_field,
173+
value_field,
174+
]);
175+
176+
validate_shredded_schema(&schema).unwrap();
177+
}
178+
179+
#[test]
180+
fn test_regular_variant_schema_missing_metadata() {
181+
let value_field = Field::new("value", DataType::BinaryView, false);
182+
let schema = Fields::from(vec![value_field]);
183+
184+
let err = validate_shredded_schema(&schema).unwrap_err();
185+
186+
assert_eq!(
187+
err.to_string(),
188+
"Invalid argument error: Invalid VariantArray: StructArray must contain a 'metadata' field"
189+
);
190+
}
191+
192+
#[test]
193+
fn test_regular_variant_schema_nullable_metadata() {
194+
let metadata_field = Field::new("metadata", DataType::BinaryView, true);
195+
let value_field = Field::new("value", DataType::BinaryView, false);
196+
197+
let schema = Fields::from(vec![metadata_field, value_field]);
198+
199+
let err = validate_shredded_schema(&schema).unwrap_err();
200+
201+
assert_eq!(
202+
err.to_string(),
203+
"Invalid argument error: Invalid VariantArray: metadata field can not be nullable"
204+
);
205+
}
206+
207+
#[test]
208+
fn test_regular_variant_schema_allow_nullable_value() {
209+
// a regular variant schema
210+
let metadata_field = Field::new("metadata", DataType::BinaryView, false);
211+
let value_field = Field::new("value", DataType::BinaryView, true);
212+
213+
let schema = Fields::from(vec![metadata_field, value_field]);
214+
215+
validate_shredded_schema(&schema).unwrap();
216+
}
217+
218+
#[test]
219+
fn test_shredded_variant_schema() {
220+
let metadata_field = Field::new("metadata", DataType::BinaryView, false);
221+
let typed_value_field = Field::new("typed_value", DataType::Int64, false);
222+
let schema = Fields::from(vec![metadata_field, typed_value_field]);
223+
224+
validate_shredded_schema(&schema).unwrap();
225+
}
226+
227+
#[test]
228+
fn test_partially_shredded_variant_schema() {
229+
let metadata_field = Field::new("metadata", DataType::BinaryView, false);
230+
let value_field = Field::new("value", DataType::BinaryView, false);
231+
let typed_value_field = Field::new("typed_value", DataType::Int64, false);
232+
let schema = Fields::from(vec![metadata_field, value_field, typed_value_field]);
233+
234+
validate_shredded_schema(&schema).unwrap();
235+
}
236+
237+
#[test]
238+
fn test_partially_shredded_variant_list_schema() {
239+
/*
240+
optional group tags (VARIANT) {
241+
required binary metadata;
242+
optional binary value;
243+
optional group typed_value (LIST) { # must be optional to allow a null list
244+
repeated group list {
245+
required group element { # shredded element
246+
optional binary value;
247+
optional binary typed_value (STRING);
248+
}
249+
required group element { # shredded element
250+
optional binary value;
251+
optional int64 typed_value ;
252+
}
253+
}
254+
}
255+
}
256+
*/
257+
258+
let metadata_field = Field::new("metadata", DataType::BinaryView, false);
259+
let value_field = Field::new("value", DataType::BinaryView, false);
260+
261+
// Define union fields for different element types
262+
let string_element = {
263+
let value_field = Field::new("value", DataType::BinaryView, true);
264+
let typed_value = Field::new("typed_value", DataType::BinaryView, true);
265+
DataType::Struct(Fields::from(vec![value_field, typed_value]))
266+
};
267+
268+
let int_element = {
269+
let value_field = Field::new("value", DataType::BinaryView, true);
270+
let typed_value = Field::new("typed_value", DataType::Int64, true);
271+
DataType::Struct(Fields::from(vec![value_field, typed_value]))
272+
};
273+
274+
// Create union of different element types
275+
let union_fields = UnionFields::new(
276+
vec![0, 1],
277+
vec![
278+
Field::new("string_element", string_element, true),
279+
Field::new("int_element", int_element, true),
280+
],
281+
);
282+
283+
let typed_value_field = Field::new(
284+
"typed_value",
285+
DataType::Union(union_fields, UnionMode::Sparse),
286+
false,
287+
);
288+
let schema = Fields::from(vec![metadata_field, value_field, typed_value_field]);
289+
290+
validate_shredded_schema(&schema).unwrap();
291+
}
292+
293+
#[test]
294+
fn test_partially_shredded_variant_object_schema() {
295+
/*
296+
optional group event (VARIANT) {
297+
required binary metadata;
298+
optional binary value; # a variant, expected to be an object
299+
optional group typed_value { # shredded fields for the variant object
300+
required group event_type { # shredded field for event_type
301+
optional binary value;
302+
optional binary typed_value (STRING);
303+
}
304+
required group event_ts { # shredded field for event_ts
305+
optional binary value;
306+
optional int64 typed_value (TIMESTAMP(true, MICROS));
307+
}
308+
}
309+
}
310+
*/
311+
312+
let metadata_field = Field::new("metadata", DataType::BinaryView, false);
313+
let value_field = Field::new("value", DataType::BinaryView, false);
314+
315+
// event_type
316+
let element_group_1 = {
317+
let value_field = Field::new("value", DataType::BinaryView, false);
318+
let typed_value = Field::new("typed_value", DataType::BinaryView, false); // this is the string case
319+
320+
Fields::from(vec![value_field, typed_value])
321+
};
322+
323+
// event_ts
324+
let element_group_2 = {
325+
let value_field = Field::new("value", DataType::BinaryView, false);
326+
let typed_value = Field::new("typed_value", DataType::Int64, false);
327+
328+
Fields::from(vec![value_field, typed_value])
329+
};
330+
331+
let typed_value_field = Field::new(
332+
"typed_value",
333+
DataType::Union(
334+
UnionFields::new(
335+
vec![0, 1],
336+
vec![
337+
Field::new("event_type", DataType::Struct(element_group_1), true),
338+
Field::new("event_ts", DataType::Struct(element_group_2), true),
339+
],
340+
),
341+
UnionMode::Sparse,
342+
),
343+
false,
344+
);
345+
let schema = Fields::from(vec![metadata_field, value_field, typed_value_field]);
346+
347+
validate_shredded_schema(&schema).unwrap();
348+
}
349+
}

0 commit comments

Comments
 (0)