Skip to content

Commit 291e6e5

Browse files
Add arrow-avro support for Impala Nullability (#7954)
# Which issue does this PR close? - Part of #4886 - Related to #6965 # Rationale for this change This change introduces support for Avro files generated by systems like Impala, which have a specific convention for representing nullable fields. In Avro, nullability is typically represented by a union of a type and a type. This PR updates the Avro reader to correctly interpret these schemas, ensuring proper handling of nullable data and improving interoperability with Impala-generated data. `null` # What changes are included in this PR? This pull request introduces several changes to support Impala-style nullability in the Avro reader: - The Avro schema parser has been updated to recognize unions where is the second type (e.g., `['type', 'null']`) as a nullable field. `null` - Logic has been added to handle this nullability convention during Avro decoding. - New tests are included to verify that Avro files using this nullability format are read correctly while ensuring that strict mode properly identifies them. # Are these changes tested? Yes, I added new test cases covering these changes to the tests named: `test_nonnullable_impala`, `test_nonnullable_impala_strict`, `test_nullable_impala` and `test_nullable_impala_strict`. # Are there any user-facing changes? N/A --------- Co-authored-by: Connor Sanders <[email protected]>
1 parent 82821e5 commit 291e6e5

File tree

3 files changed

+508
-45
lines changed

3 files changed

+508
-45
lines changed

arrow-avro/src/codec.rs

Lines changed: 109 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ impl<'a> TryFrom<&Schema<'a>> for AvroField {
148148
match schema {
149149
Schema::Complex(ComplexType::Record(r)) => {
150150
let mut resolver = Resolver::default();
151-
let data_type = make_data_type(schema, None, &mut resolver, false)?;
151+
let data_type = make_data_type(schema, None, &mut resolver, false, false)?;
152152
Ok(AvroField {
153153
data_type,
154154
name: r.name.to_string(),
@@ -161,6 +161,60 @@ impl<'a> TryFrom<&Schema<'a>> for AvroField {
161161
}
162162
}
163163

164+
/// Builder for an [`AvroField`]
165+
#[derive(Debug)]
166+
pub struct AvroFieldBuilder<'a> {
167+
schema: &'a Schema<'a>,
168+
use_utf8view: bool,
169+
strict_mode: bool,
170+
}
171+
172+
impl<'a> AvroFieldBuilder<'a> {
173+
/// Creates a new [`AvroFieldBuilder`]
174+
pub fn new(schema: &'a Schema<'a>) -> Self {
175+
Self {
176+
schema,
177+
use_utf8view: false,
178+
strict_mode: false,
179+
}
180+
}
181+
182+
/// Enable or disable Utf8View support
183+
pub fn with_utf8view(mut self, use_utf8view: bool) -> Self {
184+
self.use_utf8view = use_utf8view;
185+
self
186+
}
187+
188+
/// Enable or disable strict mode.
189+
pub fn with_strict_mode(mut self, strict_mode: bool) -> Self {
190+
self.strict_mode = strict_mode;
191+
self
192+
}
193+
194+
/// Build an [`AvroField`] from the builder
195+
pub fn build(self) -> Result<AvroField, ArrowError> {
196+
match self.schema {
197+
Schema::Complex(ComplexType::Record(r)) => {
198+
let mut resolver = Resolver::default();
199+
let data_type = make_data_type(
200+
self.schema,
201+
None,
202+
&mut resolver,
203+
self.use_utf8view,
204+
self.strict_mode,
205+
)?;
206+
Ok(AvroField {
207+
name: r.name.to_string(),
208+
data_type,
209+
})
210+
}
211+
_ => Err(ArrowError::ParseError(format!(
212+
"Expected a Record schema to build an AvroField, but got {:?}",
213+
self.schema
214+
))),
215+
}
216+
}
217+
}
164218
/// An Avro encoding
165219
///
166220
/// <https://avro.apache.org/docs/1.11.1/specification/#encodings>
@@ -409,6 +463,7 @@ fn make_data_type<'a>(
409463
namespace: Option<&'a str>,
410464
resolver: &mut Resolver<'a>,
411465
use_utf8view: bool,
466+
strict_mode: bool,
412467
) -> Result<AvroDataType, ArrowError> {
413468
match schema {
414469
Schema::TypeName(TypeName::Primitive(p)) => {
@@ -428,12 +483,20 @@ fn make_data_type<'a>(
428483
.position(|x| x == &Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)));
429484
match (f.len() == 2, null) {
430485
(true, Some(0)) => {
431-
let mut field = make_data_type(&f[1], namespace, resolver, use_utf8view)?;
486+
let mut field =
487+
make_data_type(&f[1], namespace, resolver, use_utf8view, strict_mode)?;
432488
field.nullability = Some(Nullability::NullFirst);
433489
Ok(field)
434490
}
435491
(true, Some(1)) => {
436-
let mut field = make_data_type(&f[0], namespace, resolver, use_utf8view)?;
492+
if strict_mode {
493+
return Err(ArrowError::SchemaError(
494+
"Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
495+
.to_string(),
496+
));
497+
}
498+
let mut field =
499+
make_data_type(&f[0], namespace, resolver, use_utf8view, strict_mode)?;
437500
field.nullability = Some(Nullability::NullSecond);
438501
Ok(field)
439502
}
@@ -456,6 +519,7 @@ fn make_data_type<'a>(
456519
namespace,
457520
resolver,
458521
use_utf8view,
522+
strict_mode,
459523
)?,
460524
})
461525
})
@@ -469,8 +533,13 @@ fn make_data_type<'a>(
469533
Ok(field)
470534
}
471535
ComplexType::Array(a) => {
472-
let mut field =
473-
make_data_type(a.items.as_ref(), namespace, resolver, use_utf8view)?;
536+
let mut field = make_data_type(
537+
a.items.as_ref(),
538+
namespace,
539+
resolver,
540+
use_utf8view,
541+
strict_mode,
542+
)?;
474543
Ok(AvroDataType {
475544
nullability: None,
476545
metadata: a.attributes.field_metadata(),
@@ -535,7 +604,8 @@ fn make_data_type<'a>(
535604
Ok(field)
536605
}
537606
ComplexType::Map(m) => {
538-
let val = make_data_type(&m.values, namespace, resolver, use_utf8view)?;
607+
let val =
608+
make_data_type(&m.values, namespace, resolver, use_utf8view, strict_mode)?;
539609
Ok(AvroDataType {
540610
nullability: None,
541611
metadata: m.attributes.field_metadata(),
@@ -549,6 +619,7 @@ fn make_data_type<'a>(
549619
namespace,
550620
resolver,
551621
use_utf8view,
622+
strict_mode,
552623
)?;
553624

554625
// https://avro.apache.org/docs/1.11.1/specification/#logical-types
@@ -630,7 +701,7 @@ mod tests {
630701
let schema = create_schema_with_logical_type(PrimitiveType::Int, "date");
631702

632703
let mut resolver = Resolver::default();
633-
let result = make_data_type(&schema, None, &mut resolver, false).unwrap();
704+
let result = make_data_type(&schema, None, &mut resolver, false, false).unwrap();
634705

635706
assert!(matches!(result.codec, Codec::Date32));
636707
}
@@ -640,7 +711,7 @@ mod tests {
640711
let schema = create_schema_with_logical_type(PrimitiveType::Int, "time-millis");
641712

642713
let mut resolver = Resolver::default();
643-
let result = make_data_type(&schema, None, &mut resolver, false).unwrap();
714+
let result = make_data_type(&schema, None, &mut resolver, false, false).unwrap();
644715

645716
assert!(matches!(result.codec, Codec::TimeMillis));
646717
}
@@ -650,7 +721,7 @@ mod tests {
650721
let schema = create_schema_with_logical_type(PrimitiveType::Long, "time-micros");
651722

652723
let mut resolver = Resolver::default();
653-
let result = make_data_type(&schema, None, &mut resolver, false).unwrap();
724+
let result = make_data_type(&schema, None, &mut resolver, false, false).unwrap();
654725

655726
assert!(matches!(result.codec, Codec::TimeMicros));
656727
}
@@ -660,7 +731,7 @@ mod tests {
660731
let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-millis");
661732

662733
let mut resolver = Resolver::default();
663-
let result = make_data_type(&schema, None, &mut resolver, false).unwrap();
734+
let result = make_data_type(&schema, None, &mut resolver, false, false).unwrap();
664735

665736
assert!(matches!(result.codec, Codec::TimestampMillis(true)));
666737
}
@@ -670,7 +741,7 @@ mod tests {
670741
let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-micros");
671742

672743
let mut resolver = Resolver::default();
673-
let result = make_data_type(&schema, None, &mut resolver, false).unwrap();
744+
let result = make_data_type(&schema, None, &mut resolver, false, false).unwrap();
674745

675746
assert!(matches!(result.codec, Codec::TimestampMicros(true)));
676747
}
@@ -680,7 +751,7 @@ mod tests {
680751
let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-millis");
681752

682753
let mut resolver = Resolver::default();
683-
let result = make_data_type(&schema, None, &mut resolver, false).unwrap();
754+
let result = make_data_type(&schema, None, &mut resolver, false, false).unwrap();
684755

685756
assert!(matches!(result.codec, Codec::TimestampMillis(false)));
686757
}
@@ -690,7 +761,7 @@ mod tests {
690761
let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-micros");
691762

692763
let mut resolver = Resolver::default();
693-
let result = make_data_type(&schema, None, &mut resolver, false).unwrap();
764+
let result = make_data_type(&schema, None, &mut resolver, false, false).unwrap();
694765

695766
assert!(matches!(result.codec, Codec::TimestampMicros(false)));
696767
}
@@ -745,7 +816,7 @@ mod tests {
745816
let schema = create_schema_with_logical_type(PrimitiveType::Int, "custom-type");
746817

747818
let mut resolver = Resolver::default();
748-
let result = make_data_type(&schema, None, &mut resolver, false).unwrap();
819+
let result = make_data_type(&schema, None, &mut resolver, false, false).unwrap();
749820

750821
assert_eq!(
751822
result.metadata.get("logicalType"),
@@ -758,7 +829,7 @@ mod tests {
758829
let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
759830

760831
let mut resolver = Resolver::default();
761-
let result = make_data_type(&schema, None, &mut resolver, true).unwrap();
832+
let result = make_data_type(&schema, None, &mut resolver, true, false).unwrap();
762833

763834
assert!(matches!(result.codec, Codec::Utf8View));
764835
}
@@ -768,7 +839,7 @@ mod tests {
768839
let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
769840

770841
let mut resolver = Resolver::default();
771-
let result = make_data_type(&schema, None, &mut resolver, false).unwrap();
842+
let result = make_data_type(&schema, None, &mut resolver, false, false).unwrap();
772843

773844
assert!(matches!(result.codec, Codec::Utf8));
774845
}
@@ -796,7 +867,7 @@ mod tests {
796867
let schema = Schema::Complex(ComplexType::Record(record));
797868

798869
let mut resolver = Resolver::default();
799-
let result = make_data_type(&schema, None, &mut resolver, true).unwrap();
870+
let result = make_data_type(&schema, None, &mut resolver, true, false).unwrap();
800871

801872
if let Codec::Struct(fields) = &result.codec {
802873
let first_field_codec = &fields[0].data_type().codec;
@@ -805,4 +876,25 @@ mod tests {
805876
panic!("Expected Struct codec");
806877
}
807878
}
879+
880+
#[test]
881+
fn test_union_with_strict_mode() {
882+
let schema = Schema::Union(vec![
883+
Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
884+
Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
885+
]);
886+
887+
let mut resolver = Resolver::default();
888+
let result = make_data_type(&schema, None, &mut resolver, false, true);
889+
890+
assert!(result.is_err());
891+
match result {
892+
Err(ArrowError::SchemaError(msg)) => {
893+
assert!(msg.contains(
894+
"Found Avro union of the form ['T','null'], which is disallowed in strict_mode"
895+
));
896+
}
897+
_ => panic!("Expected SchemaError"),
898+
}
899+
}
808900
}

0 commit comments

Comments
 (0)