From ba3b0a077d4105da0afa471efe8b1fb1c07e14fd Mon Sep 17 00:00:00 2001 From: Martin Tzvetanov Grigorov Date: Mon, 21 Jul 2025 13:50:42 +0300 Subject: [PATCH 01/10] Add impl for Schema::Boolean Signed-off-by: Martin Tzvetanov Grigorov --- avro/src/de.rs | 8 +- avro/src/de_schema.rs | 384 ++++++++++++++++++++++++++++++++++++++ avro/src/lib.rs | 13 +- avro/src/reader.rs | 11 ++ avro/src/ser_schema.rs | 2 +- avro/src/writer.rs | 3 +- avro/tests/avro-rs-226.rs | 11 +- 7 files changed, 417 insertions(+), 15 deletions(-) create mode 100644 avro/src/de_schema.rs diff --git a/avro/src/de.rs b/avro/src/de.rs index 13021260..74664121 100644 --- a/avro/src/de.rs +++ b/avro/src/de.rs @@ -16,16 +16,16 @@ // under the License. //! Logic for serde-compatible deserialization. -use crate::{Error, bytes::DE_BYTES_BORROWED, error::Details, types::Value}; +use crate::{bytes::DE_BYTES_BORROWED, error::Details, types::Value, AvroResult, Error}; use serde::{ - Deserialize, de::{self, DeserializeSeed, Deserializer as _, Visitor}, forward_to_deserialize_any, + Deserialize, }; use std::{ collections::{ - HashMap, hash_map::{Keys, Values}, + HashMap, }, slice::Iter, }; @@ -756,7 +756,7 @@ impl<'de> de::Deserializer<'de> for StringDeserializer { /// /// This conversion can fail if the structure of the `Value` does not match the /// structure expected by `D`. -pub fn from_value<'de, D: Deserialize<'de>>(value: &'de Value) -> Result { +pub fn from_value<'de, D: Deserialize<'de>>(value: &'de Value) -> AvroResult { let de = Deserializer::new(value); D::deserialize(&de) } diff --git a/avro/src/de_schema.rs b/avro/src/de_schema.rs new file mode 100644 index 00000000..88f9a7fc --- /dev/null +++ b/avro/src/de_schema.rs @@ -0,0 +1,384 @@ +use crate::error::Details; +use crate::schema::{NamesRef, Namespace}; +use crate::{Error, Schema}; +use serde::de::Visitor; +use std::io::Read; + +pub struct SchemaAwareReadDeserializer<'s, R: Read> { + reader: &'s mut R, + root_schema: &'s Schema, + names: &'s NamesRef<'s>, + enclosing_namespace: Namespace, +} + +impl<'s, R: Read> SchemaAwareReadDeserializer<'s, R> { + pub(crate) fn new( + reader: &'s mut R, + root_schema: &'s Schema, + names: &'s NamesRef<'s>, + enclosing_namespace: Namespace, + ) -> Self { + Self { + reader, + root_schema, + names, + enclosing_namespace, + } + } +} + +impl<'de, R: Read> serde::de::Deserializer<'de> for SchemaAwareReadDeserializer<'de, R> { + type Error = Error; + + fn deserialize_any(self, _visitor: V) -> Result + where + V: Visitor<'de>, + { + // Implement the deserialization logic here + unimplemented!() + } + + fn deserialize_bool(self, visitor: V) -> Result + where + V: Visitor<'de>, + { + let schema = self.root_schema; + let mut this = self; + (&mut this).deserialize_bool_with_schema(visitor, schema) + } + + fn deserialize_i8(self, _visitor: V) -> Result + where + V: Visitor<'de>, + { + todo!() + } + + fn deserialize_i16(self, _visitor: V) -> Result + where + V: Visitor<'de>, + { + todo!() + } + + fn deserialize_i32(self, _visitor: V) -> Result + where + V: Visitor<'de>, + { + todo!() + } + + fn deserialize_i64(self, _visitor: V) -> Result + where + V: Visitor<'de>, + { + todo!() + } + + fn deserialize_u8(self, _visitor: V) -> Result + where + V: Visitor<'de>, + { + todo!() + } + + fn deserialize_u16(self, _visitor: V) -> Result + where + V: Visitor<'de>, + { + todo!() + } + + fn deserialize_u32(self, _visitor: V) -> Result + where + V: Visitor<'de>, + { + todo!() + } + + fn deserialize_u64(self, _visitor: V) -> Result + where + V: Visitor<'de>, + { + todo!() + } + + fn deserialize_f32(self, _visitor: V) -> Result + where + V: Visitor<'de>, + { + todo!() + } + + fn deserialize_f64(self, _visitor: V) -> Result + where + V: Visitor<'de>, + { + todo!() + } + + fn deserialize_char(self, _visitor: V) -> Result + where + V: Visitor<'de>, + { + todo!() + } + + fn deserialize_str(self, _visitor: V) -> Result + where + V: Visitor<'de>, + { + todo!() + } + + fn deserialize_string(self, _visitor: V) -> Result + where + V: Visitor<'de>, + { + todo!() + } + + fn deserialize_bytes(self, _visitor: V) -> Result + where + V: Visitor<'de>, + { + todo!() + } + + fn deserialize_byte_buf(self, _visitor: V) -> Result + where + V: Visitor<'de>, + { + todo!() + } + + fn deserialize_option(self, _visitor: V) -> Result + where + V: Visitor<'de>, + { + todo!() + } + + fn deserialize_unit(self, _visitor: V) -> Result + where + V: Visitor<'de>, + { + todo!() + } + + fn deserialize_unit_struct( + self, + _name: &'static str, + _visitor: V, + ) -> Result + where + V: Visitor<'de>, + { + todo!() + } + + fn deserialize_newtype_struct( + self, + _name: &'static str, + _visitor: V, + ) -> Result + where + V: Visitor<'de>, + { + todo!() + } + + fn deserialize_seq(self, _visitor: V) -> Result + where + V: Visitor<'de>, + { + todo!() + } + + fn deserialize_tuple(self, _len: usize, _visitor: V) -> Result + where + V: Visitor<'de>, + { + todo!() + } + + fn deserialize_tuple_struct( + self, + _name: &'static str, + _len: usize, + _visitor: V, + ) -> Result + where + V: Visitor<'de>, + { + todo!() + } + + fn deserialize_map(self, _visitor: V) -> Result + where + V: Visitor<'de>, + { + todo!() + } + + fn deserialize_struct( + self, + _name: &'static str, + _fields: &'static [&'static str], + _visitor: V, + ) -> Result + where + V: Visitor<'de>, + { + todo!() + } + + fn deserialize_enum( + self, + _name: &'static str, + _variants: &'static [&'static str], + _visitor: V, + ) -> Result + where + V: Visitor<'de>, + { + todo!() + } + + fn deserialize_identifier(self, _visitor: V) -> Result + where + V: Visitor<'de>, + { + todo!() + } + + fn deserialize_ignored_any(self, _visitor: V) -> Result + where + V: Visitor<'de>, + { + todo!() + } +} + +impl<'s, R: Read> SchemaAwareReadDeserializer<'s, R> { + fn deserialize_bool_with_schema<'de, V>( + &mut self, + visitor: V, + schema: &Schema, + ) -> Result + where + V: Visitor<'de>, + { + let create_error = |cause: &str| { + Details::SerializeValueWithSchema { + // TODO: DeserializeValueWithSchema + value_type: "bool", + value: format!("Cause: {cause}"), + schema: schema.clone(), + } + .into() + }; + + match schema { + Schema::Boolean => { + let mut buf = [0; 1]; + self.reader + .read_exact(&mut buf) // Read a single byte + .map_err(|e| create_error(&format!("Failed to read: {e}")))?; + let value = buf[0] != 0; + visitor.visit_bool(value) + } + Schema::Union(union_schema) => { + for (_, variant_schema) in union_schema.schemas.iter().enumerate() { + match variant_schema { + Schema::Boolean => { + return self.deserialize_bool_with_schema(visitor, variant_schema); + } + _ => { /* skip */ } + } + } + Err(create_error(&format!( + "The union schema must have a boolean variant: {schema:?}" + ))) + } + unexpected => Err(create_error(&format!( + "Expected a boolean schema, found: {unexpected:?}" + ))), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::error::Details; + use crate::reader::read_avro_datum_ref; + use crate::schema::{Schema, UnionSchema}; + use apache_avro_test_helper::TestResult; + + #[test] + fn avro_rs_226_deserialize_bool_boolean_schema() -> TestResult { + let schema = Schema::Boolean; + + for (byte, expected) in [(0, false), (1, true)] { + let mut reader: &[u8] = &[byte]; + let read: bool = read_avro_datum_ref(&schema, &mut reader)?; + assert_eq!(read, expected); + } + Ok(()) + } + + #[test] + fn avro_rs_226_deserialize_bool_union_boolean_schema() -> TestResult { + let schema = Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Boolean])?); + + for (byte, expected) in [(0, false), (1, true)] { + let mut reader: &[u8] = &[byte]; + let read: bool = read_avro_datum_ref(&schema, &mut reader)?; + assert_eq!(read, expected); + } + Ok(()) + } + + #[test] + fn avro_rs_226_deserialize_bool_invalid_schema() -> TestResult { + let schema = Schema::Long; // Using a non-boolean schema + + let mut reader: &[u8] = &[0, 1, 2]; + match read_avro_datum_ref::(&schema, &mut reader) { + Err(Error(Details::SerializeValueWithSchema { + value_type, + value, + schema, + })) => { + assert_eq!(value_type, "bool"); + assert!(value.contains("Cause: Expected a boolean schema")); + assert_eq!(schema.to_string(), schema.to_string()); + } + _ => panic!("Expected an error for invalid schema"), + } + + Ok(()) + } + + #[test] + fn avro_rs_226_deserialize_bool_union_invalid_schema() -> TestResult { + let schema = Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Long])?); + + let mut reader: &[u8] = &[1, 2, 3]; + match read_avro_datum_ref::(&schema, &mut reader) { + Err(Error(Details::SerializeValueWithSchema { + value_type, + value, + schema, + })) => { + assert_eq!(value_type, "bool"); + assert!(value.contains("The union schema must have a boolean variant")); + assert_eq!(schema.to_string(), schema.to_string()); + } + _ => panic!("Expected an error for invalid union schema"), + } + + Ok(()) + } +} diff --git a/avro/src/lib.rs b/avro/src/lib.rs index 225cbc13..8dd3f284 100644 --- a/avro/src/lib.rs +++ b/avro/src/lib.rs @@ -870,6 +870,7 @@ mod ser_schema; mod util; mod writer; +mod de_schema; pub mod error; pub mod headers; pub mod rabin; @@ -898,16 +899,16 @@ pub use decimal::Decimal; pub use duration::{Days, Duration, Millis, Months}; pub use error::Error; pub use reader::{ - GenericSingleObjectReader, Reader, SpecificSingleObjectReader, from_avro_datum, - from_avro_datum_reader_schemata, from_avro_datum_schemata, read_marker, + from_avro_datum, from_avro_datum_reader_schemata, from_avro_datum_schemata, read_marker, + GenericSingleObjectReader, Reader, SpecificSingleObjectReader, }; pub use schema::{AvroSchema, Schema}; pub use ser::to_value; pub use util::{max_allocation_bytes, set_serde_human_readable}; pub use uuid::Uuid; pub use writer::{ - GenericSingleObjectWriter, SpecificSingleObjectWriter, Writer, WriterBuilder, to_avro_datum, - to_avro_datum_schemata, write_avro_datum_ref, + to_avro_datum, to_avro_datum_schemata, write_avro_datum_ref, GenericSingleObjectWriter, SpecificSingleObjectWriter, + Writer, WriterBuilder, }; #[cfg(feature = "derive")] @@ -919,8 +920,8 @@ pub type AvroResult = Result; #[cfg(test)] mod tests { use crate::{ - Codec, Reader, Schema, Writer, from_avro_datum, - types::{Record, Value}, + from_avro_datum, types::{Record, Value}, Codec, Reader, Schema, + Writer, }; use pretty_assertions::assert_eq; diff --git a/avro/src/reader.rs b/avro/src/reader.rs index e2f7570f..3142e849 100644 --- a/avro/src/reader.rs +++ b/avro/src/reader.rs @@ -16,6 +16,8 @@ // under the License. //! Logic handling reading from Avro format at user level. +use crate::de_schema::SchemaAwareReadDeserializer; +use crate::schema::NamesRef; use crate::{ AvroResult, Codec, Error, decode::{decode, decode_internal}, @@ -597,6 +599,15 @@ pub fn read_marker(bytes: &[u8]) -> [u8; 16] { marker } +pub fn read_avro_datum_ref<'de, D: DeserializeOwned, R: Read>( + schema: &Schema, + reader: &mut R, +) -> AvroResult { + let names: NamesRef = NamesRef::default(); + let deserializer = SchemaAwareReadDeserializer::new(reader, schema, &names, None); + D::deserialize(deserializer) +} + #[cfg(test)] mod tests { use super::*; diff --git a/avro/src/ser_schema.rs b/avro/src/ser_schema.rs index e62d21d1..4f014ee4 100644 --- a/avro/src/ser_schema.rs +++ b/avro/src/ser_schema.rs @@ -1766,7 +1766,7 @@ impl<'a, 's, W: Write> ser::Serializer for &'a mut SchemaAwareWriteSerializer<'s mod tests { use super::*; use crate::{ - Days, Duration, Millis, Months, decimal::Decimal, error::Details, schema::ResolvedSchema, + decimal::Decimal, error::Details, schema::ResolvedSchema, Days, Duration, Millis, Months, }; use apache_avro_test_helper::TestResult; use bigdecimal::BigDecimal; diff --git a/avro/src/writer.rs b/avro/src/writer.rs index 9c879918..f9d4a0ac 100644 --- a/avro/src/writer.rs +++ b/avro/src/writer.rs @@ -728,8 +728,7 @@ pub fn write_avro_datum_ref( ) -> AvroResult { let names: HashMap = HashMap::new(); let mut serializer = SchemaAwareWriteSerializer::new(writer, schema, &names, None); - let bytes_written = data.serialize(&mut serializer)?; - Ok(bytes_written) + data.serialize(&mut serializer) } /// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also diff --git a/avro/tests/avro-rs-226.rs b/avro/tests/avro-rs-226.rs index 10dc80db..89aef93e 100644 --- a/avro/tests/avro-rs-226.rs +++ b/avro/tests/avro-rs-226.rs @@ -29,9 +29,16 @@ where writer.append_ser(record)?; let bytes_written = writer.into_inner()?; - let reader = apache_avro::Reader::new(&bytes_written[..])?; + // let mut bytes_written = Cursor::new(bytes_written); + // let value = from_avro_datum(schema, &mut bytes_written, None)?; + // dbg!(&value); + // let deserialized = from_value::(&value)?; + // assert_eq!(deserialized, record2); + + let reader = apache_avro::Reader::with_schema(schema, &bytes_written[..])?; for value in reader { let value = value?; + dbg!(&value); let deserialized = from_value::(&value)?; assert_eq!(deserialized, record2); } @@ -63,7 +70,7 @@ fn avro_rs_226_index_out_of_bounds_with_serde_skip_serializing_skip_middle_field fn avro_rs_226_index_out_of_bounds_with_serde_skip_serializing_skip_first_field() -> TestResult { #[derive(AvroSchema, Clone, Debug, Deserialize, PartialEq, Serialize)] struct T { - #[serde(skip_serializing_if = "Option::is_none")] + // #[serde(skip_serializing_if = "Option::is_none")] x: Option, y: Option, z: Option, From 3af99ae825ac98d5f95bdbdf8d443fb4ea7ded44 Mon Sep 17 00:00:00 2001 From: Martin Tzvetanov Grigorov Date: Mon, 21 Jul 2025 14:05:15 +0300 Subject: [PATCH 02/10] add impl for Schema::Int Signed-off-by: Martin Tzvetanov Grigorov --- avro/src/de_schema.rs | 148 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 134 insertions(+), 14 deletions(-) diff --git a/avro/src/de_schema.rs b/avro/src/de_schema.rs index 88f9a7fc..add3896e 100644 --- a/avro/src/de_schema.rs +++ b/avro/src/de_schema.rs @@ -1,5 +1,6 @@ use crate::error::Details; use crate::schema::{NamesRef, Namespace}; +use crate::util::zag_i32; use crate::{Error, Schema}; use serde::de::Visitor; use std::io::Read; @@ -7,22 +8,22 @@ use std::io::Read; pub struct SchemaAwareReadDeserializer<'s, R: Read> { reader: &'s mut R, root_schema: &'s Schema, - names: &'s NamesRef<'s>, - enclosing_namespace: Namespace, + _names: &'s NamesRef<'s>, + _enclosing_namespace: Namespace, } impl<'s, R: Read> SchemaAwareReadDeserializer<'s, R> { pub(crate) fn new( reader: &'s mut R, root_schema: &'s Schema, - names: &'s NamesRef<'s>, - enclosing_namespace: Namespace, + _names: &'s NamesRef<'s>, + _enclosing_namespace: Namespace, ) -> Self { Self { reader, root_schema, - names, - enclosing_namespace, + _names, + _enclosing_namespace, } } } @@ -47,25 +48,27 @@ impl<'de, R: Read> serde::de::Deserializer<'de> for SchemaAwareReadDeserializer< (&mut this).deserialize_bool_with_schema(visitor, schema) } - fn deserialize_i8(self, _visitor: V) -> Result + fn deserialize_i8(self, visitor: V) -> Result where V: Visitor<'de>, { - todo!() + self.deserialize_i32(visitor) } - fn deserialize_i16(self, _visitor: V) -> Result + fn deserialize_i16(self, visitor: V) -> Result where V: Visitor<'de>, { - todo!() + self.deserialize_i32(visitor) } - fn deserialize_i32(self, _visitor: V) -> Result + fn deserialize_i32(self, visitor: V) -> Result where V: Visitor<'de>, { - todo!() + let schema = self.root_schema; + let mut this = self; + (&mut this).deserialize_i32_with_schema(visitor, schema) } fn deserialize_i64(self, _visitor: V) -> Result @@ -298,7 +301,7 @@ impl<'s, R: Read> SchemaAwareReadDeserializer<'s, R> { } } Err(create_error(&format!( - "The union schema must have a boolean variant: {schema:?}" + "The union schema must have a Boolean variant: {schema:?}" ))) } unexpected => Err(create_error(&format!( @@ -306,6 +309,45 @@ impl<'s, R: Read> SchemaAwareReadDeserializer<'s, R> { ))), } } + + fn deserialize_i32_with_schema<'de, V>( + &mut self, + visitor: V, + schema: &Schema, + ) -> Result + where + V: Visitor<'de>, + { + let create_error = |cause: &str| Error::SerializeValueWithSchema { + // TODO: DeserializeValueWithSchema + value_type: "i32", + value: format!("Cause: {cause}"), + schema: Box::new(schema.clone()), + }; + + match schema { + Schema::Int => { + let i = zag_i32(&mut self.reader)?; + visitor.visit_i32(i) + } + Schema::Union(union_schema) => { + for (_, variant_schema) in union_schema.schemas.iter().enumerate() { + match variant_schema { + Schema::Int => { + return self.deserialize_i32_with_schema(visitor, variant_schema); + } + _ => { /* skip */ } + } + } + Err(create_error(&format!( + "The union schema must have an Int variant: {schema:?}" + ))) + } + unexpected => Err(create_error(&format!( + "Expected an Int schema, found: {unexpected:?}" + ))), + } + } } #[cfg(test)] @@ -314,7 +356,9 @@ mod tests { use crate::error::Details; use crate::reader::read_avro_datum_ref; use crate::schema::{Schema, UnionSchema}; + use crate::util::zig_i32; use apache_avro_test_helper::TestResult; + use std::io::Cursor; #[test] fn avro_rs_226_deserialize_bool_boolean_schema() -> TestResult { @@ -373,7 +417,83 @@ mod tests { schema, })) => { assert_eq!(value_type, "bool"); - assert!(value.contains("The union schema must have a boolean variant")); + assert!(value.contains("The union schema must have a Boolean variant")); + assert_eq!(schema.to_string(), schema.to_string()); + } + _ => panic!("Expected an error for invalid union schema"), + } + + Ok(()) + } + + #[test] + fn avro_rs_226_deserialize_int32_int_schema() -> TestResult { + let schema = Schema::Int; + + for value in [123_i32, -1024_i32] { + let mut writer = vec![]; + zig_i32(value, &mut writer)?; + let mut reader = Cursor::new(&writer); + let read: i32 = read_avro_datum_ref(&schema, &mut reader)?; + assert_eq!(read, value); + } + Ok(()) + } + + #[test] + fn avro_rs_226_deserialize_int32_union_int_schema() -> TestResult { + let schema = Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Boolean])?); + + for (byte, expected) in [(0, false), (1, true)] { + let mut reader: &[u8] = &[byte]; + let read: bool = read_avro_datum_ref(&schema, &mut reader)?; + assert_eq!(read, expected); + } + Ok(()) + } + + #[test] + fn avro_rs_226_deserialize_i32_invalid_schema() -> TestResult { + let schema = Schema::Long; // Using a non-boolean schema + + let mut reader: &[u8] = &[0, 1, 2]; + match read_avro_datum_ref::(&schema, &mut reader) { + Err(Error::SerializeValueWithSchema { + value_type, + value, + schema, + }) => { + assert_eq!(value_type, "i32"); + assert!( + value.contains("Cause: Expected an Int schema"), + "Got: {}", + value + ); + assert_eq!(schema.to_string(), schema.to_string()); + } + _ => panic!("Expected an error for invalid schema"), + } + + Ok(()) + } + + #[test] + fn avro_rs_226_deserialize_i32_union_invalid_schema() -> TestResult { + let schema = Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Long])?); + + let mut reader: &[u8] = &[1, 2, 3]; + match read_avro_datum_ref::(&schema, &mut reader) { + Err(Error::SerializeValueWithSchema { + value_type, + value, + schema, + }) => { + assert_eq!(value_type, "i32"); + assert!( + value.contains("The union schema must have an Int variant"), + "Got: {}", + value + ); assert_eq!(schema.to_string(), schema.to_string()); } _ => panic!("Expected an error for invalid union schema"), From 98a56c63030a5342ff6011e427d25a66cfa18dd1 Mon Sep 17 00:00:00 2001 From: Martin Tzvetanov Grigorov Date: Mon, 21 Jul 2025 14:15:55 +0300 Subject: [PATCH 03/10] Fix clippy errors Signed-off-by: Martin Tzvetanov Grigorov --- avro/src/de_schema.rs | 34 +++++++++++++++++----------------- avro/src/reader.rs | 14 +++++++------- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/avro/src/de_schema.rs b/avro/src/de_schema.rs index add3896e..2a0d6e9f 100644 --- a/avro/src/de_schema.rs +++ b/avro/src/de_schema.rs @@ -45,7 +45,7 @@ impl<'de, R: Read> serde::de::Deserializer<'de> for SchemaAwareReadDeserializer< { let schema = self.root_schema; let mut this = self; - (&mut this).deserialize_bool_with_schema(visitor, schema) + this.deserialize_bool_with_schema(visitor, schema) } fn deserialize_i8(self, visitor: V) -> Result @@ -68,7 +68,7 @@ impl<'de, R: Read> serde::de::Deserializer<'de> for SchemaAwareReadDeserializer< { let schema = self.root_schema; let mut this = self; - (&mut this).deserialize_i32_with_schema(visitor, schema) + this.deserialize_i32_with_schema(visitor, schema) } fn deserialize_i64(self, _visitor: V) -> Result @@ -292,7 +292,7 @@ impl<'s, R: Read> SchemaAwareReadDeserializer<'s, R> { visitor.visit_bool(value) } Schema::Union(union_schema) => { - for (_, variant_schema) in union_schema.schemas.iter().enumerate() { + for variant_schema in union_schema.schemas.iter() { match variant_schema { Schema::Boolean => { return self.deserialize_bool_with_schema(visitor, variant_schema); @@ -318,11 +318,13 @@ impl<'s, R: Read> SchemaAwareReadDeserializer<'s, R> { where V: Visitor<'de>, { - let create_error = |cause: &str| Error::SerializeValueWithSchema { - // TODO: DeserializeValueWithSchema - value_type: "i32", - value: format!("Cause: {cause}"), - schema: Box::new(schema.clone()), + let create_error = |cause: &str| { + Error::new(Details::SerializeValueWithSchema { + // TODO: DeserializeValueWithSchema + value_type: "i32", + value: format!("Cause: {cause}"), + schema: schema.clone(), + }) }; match schema { @@ -331,7 +333,7 @@ impl<'s, R: Read> SchemaAwareReadDeserializer<'s, R> { visitor.visit_i32(i) } Schema::Union(union_schema) => { - for (_, variant_schema) in union_schema.schemas.iter().enumerate() { + for variant_schema in union_schema.schemas.iter() { match variant_schema { Schema::Int => { return self.deserialize_i32_with_schema(visitor, variant_schema); @@ -390,7 +392,7 @@ mod tests { let mut reader: &[u8] = &[0, 1, 2]; match read_avro_datum_ref::(&schema, &mut reader) { - Err(Error(Details::SerializeValueWithSchema { + Err(Error::new(Details::SerializeValueWithSchema { value_type, value, schema, @@ -411,7 +413,7 @@ mod tests { let mut reader: &[u8] = &[1, 2, 3]; match read_avro_datum_ref::(&schema, &mut reader) { - Err(Error(Details::SerializeValueWithSchema { + Err(Error::new(Details::SerializeValueWithSchema { value_type, value, schema, @@ -458,7 +460,7 @@ mod tests { let mut reader: &[u8] = &[0, 1, 2]; match read_avro_datum_ref::(&schema, &mut reader) { - Err(Error::SerializeValueWithSchema { + Err(Details::SerializeValueWithSchema { value_type, value, schema, @@ -466,8 +468,7 @@ mod tests { assert_eq!(value_type, "i32"); assert!( value.contains("Cause: Expected an Int schema"), - "Got: {}", - value + "Got: {value}", ); assert_eq!(schema.to_string(), schema.to_string()); } @@ -483,7 +484,7 @@ mod tests { let mut reader: &[u8] = &[1, 2, 3]; match read_avro_datum_ref::(&schema, &mut reader) { - Err(Error::SerializeValueWithSchema { + Err(Details::SerializeValueWithSchema { value_type, value, schema, @@ -491,8 +492,7 @@ mod tests { assert_eq!(value_type, "i32"); assert!( value.contains("The union schema must have an Int variant"), - "Got: {}", - value + "Got: {value}", ); assert_eq!(schema.to_string(), schema.to_string()); } diff --git a/avro/src/reader.rs b/avro/src/reader.rs index 3142e849..c6e2bae6 100644 --- a/avro/src/reader.rs +++ b/avro/src/reader.rs @@ -19,17 +19,17 @@ use crate::de_schema::SchemaAwareReadDeserializer; use crate::schema::NamesRef; use crate::{ - AvroResult, Codec, Error, - decode::{decode, decode_internal}, - error::Details, - from_value, + decode::{decode, decode_internal}, error::Details, from_value, headers::{HeaderBuilder, RabinFingerprintHeader}, schema::{ - AvroSchema, Names, ResolvedOwnedSchema, ResolvedSchema, Schema, resolve_names, - resolve_names_with_schemata, + resolve_names, resolve_names_with_schemata, AvroSchema, Names, ResolvedOwnedSchema, ResolvedSchema, + Schema, }, types::Value, util, + AvroResult, + Codec, + Error, }; use log::warn; use serde::de::DeserializeOwned; @@ -599,7 +599,7 @@ pub fn read_marker(bytes: &[u8]) -> [u8; 16] { marker } -pub fn read_avro_datum_ref<'de, D: DeserializeOwned, R: Read>( +pub fn read_avro_datum_ref( schema: &Schema, reader: &mut R, ) -> AvroResult { From e57dd4ecc281beac3fe613b92d28d90654e8ee74 Mon Sep 17 00:00:00 2001 From: Martin Tzvetanov Grigorov Date: Mon, 21 Jul 2025 14:21:15 +0300 Subject: [PATCH 04/10] Fix clippy: allow dead code temporarily Signed-off-by: Martin Tzvetanov Grigorov --- avro/src/de_schema.rs | 3 ++- avro/src/reader.rs | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/avro/src/de_schema.rs b/avro/src/de_schema.rs index 2a0d6e9f..bcc263bc 100644 --- a/avro/src/de_schema.rs +++ b/avro/src/de_schema.rs @@ -13,6 +13,7 @@ pub struct SchemaAwareReadDeserializer<'s, R: Read> { } impl<'s, R: Read> SchemaAwareReadDeserializer<'s, R> { + #[allow(dead_code)] // TODO: remove! It is actually used in reader.rs pub(crate) fn new( reader: &'s mut R, root_schema: &'s Schema, @@ -263,7 +264,7 @@ impl<'de, R: Read> serde::de::Deserializer<'de> for SchemaAwareReadDeserializer< } } -impl<'s, R: Read> SchemaAwareReadDeserializer<'s, R> { +impl SchemaAwareReadDeserializer<'_, R> { fn deserialize_bool_with_schema<'de, V>( &mut self, visitor: V, diff --git a/avro/src/reader.rs b/avro/src/reader.rs index c6e2bae6..3e8839cf 100644 --- a/avro/src/reader.rs +++ b/avro/src/reader.rs @@ -599,6 +599,7 @@ pub fn read_marker(bytes: &[u8]) -> [u8; 16] { marker } +#[allow(dead_code)] // TODO: remove! It is used in de_schema.rs tests pub fn read_avro_datum_ref( schema: &Schema, reader: &mut R, From cd9471a45aa7452848898ce1d88376822edb4346 Mon Sep 17 00:00:00 2001 From: Martin Tzvetanov Grigorov Date: Mon, 21 Jul 2025 14:50:30 +0300 Subject: [PATCH 05/10] Add impl for i64/Long Signed-off-by: Martin Tzvetanov Grigorov --- avro/src/de_schema.rs | 174 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 158 insertions(+), 16 deletions(-) diff --git a/avro/src/de_schema.rs b/avro/src/de_schema.rs index bcc263bc..a7db63e4 100644 --- a/avro/src/de_schema.rs +++ b/avro/src/de_schema.rs @@ -1,6 +1,6 @@ use crate::error::Details; use crate::schema::{NamesRef, Namespace}; -use crate::util::zag_i32; +use crate::util::{zag_i32, zag_i64}; use crate::{Error, Schema}; use serde::de::Visitor; use std::io::Read; @@ -72,11 +72,13 @@ impl<'de, R: Read> serde::de::Deserializer<'de> for SchemaAwareReadDeserializer< this.deserialize_i32_with_schema(visitor, schema) } - fn deserialize_i64(self, _visitor: V) -> Result + fn deserialize_i64(self, visitor: V) -> Result where V: Visitor<'de>, { - todo!() + let schema = self.root_schema; + let mut this = self; + this.deserialize_i64_with_schema(visitor, schema) } fn deserialize_u8(self, _visitor: V) -> Result @@ -329,25 +331,87 @@ impl SchemaAwareReadDeserializer<'_, R> { }; match schema { - Schema::Int => { + Schema::Int | Schema::TimeMillis | Schema::Date => { let i = zag_i32(&mut self.reader)?; visitor.visit_i32(i) } Schema::Union(union_schema) => { for variant_schema in union_schema.schemas.iter() { match variant_schema { - Schema::Int => { + Schema::Int | Schema::TimeMillis | Schema::Date => { return self.deserialize_i32_with_schema(visitor, variant_schema); } _ => { /* skip */ } } } Err(create_error(&format!( - "The union schema must have an Int variant: {schema:?}" + "The union schema must have an Int[-like] variant: {schema:?}" ))) } unexpected => Err(create_error(&format!( - "Expected an Int schema, found: {unexpected:?}" + "Expected an Int[-like] schema, found: {unexpected:?}" + ))), + } + } + + fn deserialize_i64_with_schema<'de, V>( + &mut self, + visitor: V, + schema: &Schema, + ) -> Result + where + V: Visitor<'de>, + { + let create_error = |cause: &str| Error::SerializeValueWithSchema { + // TODO: DeserializeValueWithSchema + value_type: "i64", + value: format!("Cause: {cause}"), + schema: Box::new(schema.clone()), + }; + + match schema { + Schema::Int | Schema::TimeMillis | Schema::Date => { + let long = zag_i64(&mut self.reader)?; + let int = i32::try_from(long) + .map_err(|cause| create_error(cause.to_string().as_str()))?; + visitor.visit_i32(int) + } + Schema::Long + | Schema::TimeMicros + | Schema::TimestampMillis + | Schema::TimestampMicros + | Schema::TimestampNanos + | Schema::LocalTimestampMillis + | Schema::LocalTimestampMicros + | Schema::LocalTimestampNanos => { + let long = zag_i64(&mut self.reader)?; + visitor.visit_i64(long) + } + Schema::Union(union_schema) => { + for variant_schema in union_schema.schemas.iter() { + match variant_schema { + Schema::Int + | Schema::TimeMillis + | Schema::Date + | Schema::Long + | Schema::TimeMicros + | Schema::TimestampMillis + | Schema::TimestampMicros + | Schema::TimestampNanos + | Schema::LocalTimestampMillis + | Schema::LocalTimestampMicros + | Schema::LocalTimestampNanos => { + return self.deserialize_i64_with_schema(visitor, variant_schema); + } + _ => { /* skip */ } + } + } + Err(create_error(&format!( + "The union schema must have a Long[-like] variant: {schema:?}" + ))) + } + unexpected => Err(create_error(&format!( + "Expected a Long[-like] schema, found: {unexpected:?}" ))), } } @@ -359,7 +423,7 @@ mod tests { use crate::error::Details; use crate::reader::read_avro_datum_ref; use crate::schema::{Schema, UnionSchema}; - use crate::util::zig_i32; + use crate::util::{zig_i32, zig_i64}; use apache_avro_test_helper::TestResult; use std::io::Cursor; @@ -445,19 +509,21 @@ mod tests { #[test] fn avro_rs_226_deserialize_int32_union_int_schema() -> TestResult { - let schema = Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Boolean])?); + let schema = Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?); - for (byte, expected) in [(0, false), (1, true)] { - let mut reader: &[u8] = &[byte]; - let read: bool = read_avro_datum_ref(&schema, &mut reader)?; - assert_eq!(read, expected); + for value in [123_i32, -1024_i32] { + let mut writer = vec![]; + zig_i32(value, &mut writer)?; + let mut reader = Cursor::new(&writer); + let read: i32 = read_avro_datum_ref(&schema, &mut reader)?; + assert_eq!(read, value); } Ok(()) } #[test] fn avro_rs_226_deserialize_i32_invalid_schema() -> TestResult { - let schema = Schema::Long; // Using a non-boolean schema + let schema = Schema::Long; // Using a non-Int schema let mut reader: &[u8] = &[0, 1, 2]; match read_avro_datum_ref::(&schema, &mut reader) { @@ -468,7 +534,7 @@ mod tests { }) => { assert_eq!(value_type, "i32"); assert!( - value.contains("Cause: Expected an Int schema"), + value.contains("Cause: Expected an Int[-like] schema"), "Got: {value}", ); assert_eq!(schema.to_string(), schema.to_string()); @@ -492,7 +558,83 @@ mod tests { }) => { assert_eq!(value_type, "i32"); assert!( - value.contains("The union schema must have an Int variant"), + value.contains("The union schema must have an Int[-like] variant"), + "Got: {value}", + ); + assert_eq!(schema.to_string(), schema.to_string()); + } + _ => panic!("Expected an error for invalid union schema"), + } + + Ok(()) + } + + #[test] + fn avro_rs_226_deserialize_int64_int_schema() -> TestResult { + let schema = Schema::Int; + + for value in [123_i64, -1024_i64] { + let mut writer = vec![]; + zig_i64(value, &mut writer)?; + let mut reader = Cursor::new(&writer); + let read: i64 = read_avro_datum_ref(&schema, &mut reader)?; + assert_eq!(read, value); + } + Ok(()) + } + + #[test] + fn avro_rs_226_deserialize_int64_union_int_schema() -> TestResult { + let schema = Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::TimeMicros])?); + + for value in [123_i64, -1024_i64] { + let mut writer = vec![]; + zig_i64(value, &mut writer)?; + let mut reader = Cursor::new(&writer); + let read: i64 = read_avro_datum_ref(&schema, &mut reader)?; + assert_eq!(read, value); + } + Ok(()) + } + + #[test] + fn avro_rs_226_deserialize_i64_invalid_schema() -> TestResult { + let schema = Schema::Uuid; // Using a non-Long schema + + let mut reader: &[u8] = &[0, 1, 2]; + match read_avro_datum_ref::(&schema, &mut reader) { + Err(Error::SerializeValueWithSchema { + value_type, + value, + schema, + }) => { + assert_eq!(value_type, "i64"); + assert!( + value.contains("Cause: Expected a Long[-like] schema"), + "Got: {value}", + ); + assert_eq!(schema.to_string(), schema.to_string()); + } + _ => panic!("Expected an error for invalid schema"), + } + + Ok(()) + } + + #[test] + fn avro_rs_226_deserialize_i64_union_invalid_schema() -> TestResult { + let schema = Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::String])?); + + let mut reader: &[u8] = &[1, 2, 3]; + match read_avro_datum_ref::(&schema, &mut reader) { + Err(Error::SerializeValueWithSchema { + value_type, + value, + schema, + }) => { + assert_eq!(value_type, "i64"); + assert!( + value.contains("The union schema must have a Long[-like] variant"), "Got: {value}", ); assert_eq!(schema.to_string(), schema.to_string()); From 3741c21f0c8f584ada3c3b1c225bb013f98e5106 Mon Sep 17 00:00:00 2001 From: Martin Tzvetanov Grigorov Date: Mon, 21 Jul 2025 15:18:12 +0300 Subject: [PATCH 06/10] add impls for u8/u16/u32 Signed-off-by: Martin Tzvetanov Grigorov --- avro/src/de_schema.rs | 109 +++++++++++++++++++++++++++++++++--------- avro/src/reader.rs | 7 ++- 2 files changed, 89 insertions(+), 27 deletions(-) diff --git a/avro/src/de_schema.rs b/avro/src/de_schema.rs index a7db63e4..c2513edf 100644 --- a/avro/src/de_schema.rs +++ b/avro/src/de_schema.rs @@ -1,7 +1,9 @@ use crate::error::Details; -use crate::schema::{NamesRef, Namespace}; -use crate::util::{zag_i32, zag_i64}; -use crate::{Error, Schema}; +use crate::{ + schema::{NamesRef, Namespace}, util::{zag_i32, zag_i64}, + Error, + Schema, +}; use serde::de::Visitor; use std::io::Read; @@ -81,32 +83,32 @@ impl<'de, R: Read> serde::de::Deserializer<'de> for SchemaAwareReadDeserializer< this.deserialize_i64_with_schema(visitor, schema) } - fn deserialize_u8(self, _visitor: V) -> Result + fn deserialize_u8(self, visitor: V) -> Result where V: Visitor<'de>, { - todo!() + self.deserialize_i32(visitor) } - fn deserialize_u16(self, _visitor: V) -> Result + fn deserialize_u16(self, visitor: V) -> Result where V: Visitor<'de>, { - todo!() + self.deserialize_i32(visitor) } - fn deserialize_u32(self, _visitor: V) -> Result + fn deserialize_u32(self, visitor: V) -> Result where V: Visitor<'de>, { - todo!() + self.deserialize_i64(visitor) } - fn deserialize_u64(self, _visitor: V) -> Result + fn deserialize_u64(self, visitor: V) -> Result where V: Visitor<'de>, { - todo!() + self.deserialize_i64(visitor) } fn deserialize_f32(self, _visitor: V) -> Result @@ -332,8 +334,8 @@ impl SchemaAwareReadDeserializer<'_, R> { match schema { Schema::Int | Schema::TimeMillis | Schema::Date => { - let i = zag_i32(&mut self.reader)?; - visitor.visit_i32(i) + let int = zag_i32(&mut self.reader)?; + visitor.visit_i32(int) } Schema::Union(union_schema) => { for variant_schema in union_schema.schemas.iter() { @@ -362,11 +364,14 @@ impl SchemaAwareReadDeserializer<'_, R> { where V: Visitor<'de>, { - let create_error = |cause: &str| Error::SerializeValueWithSchema { - // TODO: DeserializeValueWithSchema - value_type: "i64", - value: format!("Cause: {cause}"), - schema: Box::new(schema.clone()), + let create_error = |cause: &str| { + Details::SerializeValueWithSchema { + // TODO: DeserializeValueWithSchema + value_type: "i64", + value: format!("Cause: {cause}"), + schema: schema.clone(), + } + .into() }; match schema { @@ -421,9 +426,11 @@ impl SchemaAwareReadDeserializer<'_, R> { mod tests { use super::*; use crate::error::Details; - use crate::reader::read_avro_datum_ref; - use crate::schema::{Schema, UnionSchema}; - use crate::util::{zig_i32, zig_i64}; + use crate::{ + reader::read_avro_datum_ref, + schema::{Schema, UnionSchema}, + util::{zig_i32, zig_i64}, + }; use apache_avro_test_helper::TestResult; use std::io::Cursor; @@ -571,9 +578,23 @@ mod tests { #[test] fn avro_rs_226_deserialize_int64_int_schema() -> TestResult { - let schema = Schema::Int; + let schema = Schema::TimeMillis; - for value in [123_i64, -1024_i64] { + for value in [i32::MAX, -i32::MAX] { + let mut writer = vec![]; + zig_i64(value as i64, &mut writer)?; + let mut reader = Cursor::new(&writer); + let read: i64 = read_avro_datum_ref(&schema, &mut reader)?; + assert_eq!(read, value as i64); + } + Ok(()) + } + + #[test] + fn avro_rs_226_deserialize_int64_long_schema() -> TestResult { + let schema = Schema::TimestampMicros; + + for value in [i64::MAX, -i64::MAX] { let mut writer = vec![]; zig_i64(value, &mut writer)?; let mut reader = Cursor::new(&writer); @@ -644,4 +665,46 @@ mod tests { Ok(()) } + + #[test] + fn avro_rs_226_deserialize_u8_int_schema() -> TestResult { + let schema = Schema::TimeMillis; + + for value in [u8::MAX, 0] { + let mut writer = vec![]; + zig_i32(value as i32, &mut writer)?; + let mut reader = Cursor::new(&writer); + let read: u8 = read_avro_datum_ref(&schema, &mut reader)?; + assert_eq!(read, value); + } + Ok(()) + } + + #[test] + fn avro_rs_226_deserialize_u16_int_schema() -> TestResult { + let schema = Schema::TimeMillis; + + for value in [u16::MAX, 0] { + let mut writer = vec![]; + zig_i32(value as i32, &mut writer)?; + let mut reader = Cursor::new(&writer); + let read: u16 = read_avro_datum_ref(&schema, &mut reader)?; + assert_eq!(read, value); + } + Ok(()) + } + + #[test] + fn avro_rs_226_deserialize_u32_long_schema() -> TestResult { + let schema = Schema::TimeMicros; + + for value in [u32::MAX, 0] { + let mut writer = vec![]; + zig_i64(value as i64, &mut writer)?; + let mut reader = Cursor::new(&writer); + let read: u32 = read_avro_datum_ref(&schema, &mut reader)?; + assert_eq!(read, value); + } + Ok(()) + } } diff --git a/avro/src/reader.rs b/avro/src/reader.rs index 3e8839cf..df19b3fd 100644 --- a/avro/src/reader.rs +++ b/avro/src/reader.rs @@ -16,13 +16,12 @@ // under the License. //! Logic handling reading from Avro format at user level. -use crate::de_schema::SchemaAwareReadDeserializer; -use crate::schema::NamesRef; +use crate::error::Details; use crate::{ - decode::{decode, decode_internal}, error::Details, from_value, + de_schema::SchemaAwareReadDeserializer, decode::{decode, decode_internal}, from_value, headers::{HeaderBuilder, RabinFingerprintHeader}, schema::{ - resolve_names, resolve_names_with_schemata, AvroSchema, Names, ResolvedOwnedSchema, ResolvedSchema, + resolve_names, resolve_names_with_schemata, AvroSchema, Names, NamesRef, ResolvedOwnedSchema, ResolvedSchema, Schema, }, types::Value, From 91a0a9936854efbccb8a9adf27701cf99af35683 Mon Sep 17 00:00:00 2001 From: Martin Tzvetanov Grigorov Date: Tue, 22 Jul 2025 09:11:49 +0300 Subject: [PATCH 07/10] WIP Signed-off-by: Martin Tzvetanov Grigorov --- avro/src/de.rs | 6 +- avro/src/de_schema.rs | 226 ++++++++++++++++++++++++++++++-------- avro/src/error.rs | 7 ++ avro/src/lib.rs | 12 +- avro/src/reader.rs | 16 +-- avro/src/ser_schema.rs | 2 +- avro/tests/avro-rs-226.rs | 23 ++-- 7 files changed, 221 insertions(+), 71 deletions(-) diff --git a/avro/src/de.rs b/avro/src/de.rs index 74664121..55bc945a 100644 --- a/avro/src/de.rs +++ b/avro/src/de.rs @@ -16,16 +16,16 @@ // under the License. //! Logic for serde-compatible deserialization. -use crate::{bytes::DE_BYTES_BORROWED, error::Details, types::Value, AvroResult, Error}; +use crate::{AvroResult, Error, bytes::DE_BYTES_BORROWED, error::Details, types::Value}; use serde::{ + Deserialize, de::{self, DeserializeSeed, Deserializer as _, Visitor}, forward_to_deserialize_any, - Deserialize, }; use std::{ collections::{ - hash_map::{Keys, Values}, HashMap, + hash_map::{Keys, Values}, }, slice::Iter, }; diff --git a/avro/src/de_schema.rs b/avro/src/de_schema.rs index c2513edf..a0a1584d 100644 --- a/avro/src/de_schema.rs +++ b/avro/src/de_schema.rs @@ -1,32 +1,24 @@ use crate::error::Details; use crate::{ - schema::{NamesRef, Namespace}, util::{zag_i32, zag_i64}, - Error, + util::{zag_i32, zag_i64}, Error, Schema, }; use serde::de::Visitor; +use serde::{de, forward_to_deserialize_any}; use std::io::Read; +use std::slice::Iter; pub struct SchemaAwareReadDeserializer<'s, R: Read> { reader: &'s mut R, root_schema: &'s Schema, - _names: &'s NamesRef<'s>, - _enclosing_namespace: Namespace, } impl<'s, R: Read> SchemaAwareReadDeserializer<'s, R> { #[allow(dead_code)] // TODO: remove! It is actually used in reader.rs - pub(crate) fn new( - reader: &'s mut R, - root_schema: &'s Schema, - _names: &'s NamesRef<'s>, - _enclosing_namespace: Namespace, - ) -> Self { + pub(crate) fn new(reader: &'s mut R, root_schema: &'s Schema) -> Self { Self { reader, root_schema, - _names, - _enclosing_namespace, } } } @@ -136,14 +128,14 @@ impl<'de, R: Read> serde::de::Deserializer<'de> for SchemaAwareReadDeserializer< where V: Visitor<'de>, { - todo!() + todo!("Implement deserialization for str") } fn deserialize_string(self, _visitor: V) -> Result where V: Visitor<'de>, { - todo!() + todo!("Implement deserialization for String") } fn deserialize_bytes(self, _visitor: V) -> Result @@ -164,7 +156,7 @@ impl<'de, R: Read> serde::de::Deserializer<'de> for SchemaAwareReadDeserializer< where V: Visitor<'de>, { - todo!() + todo!("Implement deserialization for Option") } fn deserialize_unit(self, _visitor: V) -> Result @@ -231,14 +223,18 @@ impl<'de, R: Read> serde::de::Deserializer<'de> for SchemaAwareReadDeserializer< fn deserialize_struct( self, - _name: &'static str, - _fields: &'static [&'static str], - _visitor: V, + name: &'static str, + fields: &'static [&'static str], + visitor: V, ) -> Result where V: Visitor<'de>, { - todo!() + // dbg!(name, fields, self.root_schema); + // todo!("Implement deserialization for struct"); + let schema = self.root_schema; + let mut this = self; + this.deserialize_struct_with_schema(name, fields, visitor, schema) } fn deserialize_enum( @@ -268,8 +264,8 @@ impl<'de, R: Read> serde::de::Deserializer<'de> for SchemaAwareReadDeserializer< } } -impl SchemaAwareReadDeserializer<'_, R> { - fn deserialize_bool_with_schema<'de, V>( +impl<'de, R: Read> SchemaAwareReadDeserializer<'de, R> { + fn deserialize_bool_with_schema( &mut self, visitor: V, schema: &Schema, @@ -278,8 +274,7 @@ impl SchemaAwareReadDeserializer<'_, R> { V: Visitor<'de>, { let create_error = |cause: &str| { - Details::SerializeValueWithSchema { - // TODO: DeserializeValueWithSchema + Details::DeserializeValueWithSchema { value_type: "bool", value: format!("Cause: {cause}"), schema: schema.clone(), @@ -315,7 +310,7 @@ impl SchemaAwareReadDeserializer<'_, R> { } } - fn deserialize_i32_with_schema<'de, V>( + fn deserialize_i32_with_schema( &mut self, visitor: V, schema: &Schema, @@ -324,8 +319,7 @@ impl SchemaAwareReadDeserializer<'_, R> { V: Visitor<'de>, { let create_error = |cause: &str| { - Error::new(Details::SerializeValueWithSchema { - // TODO: DeserializeValueWithSchema + Error::new(Details::DeserializeValueWithSchema { value_type: "i32", value: format!("Cause: {cause}"), schema: schema.clone(), @@ -334,7 +328,7 @@ impl SchemaAwareReadDeserializer<'_, R> { match schema { Schema::Int | Schema::TimeMillis | Schema::Date => { - let int = zag_i32(&mut self.reader)?; + let int = zag_i32(self.reader)?; visitor.visit_i32(int) } Schema::Union(union_schema) => { @@ -356,7 +350,7 @@ impl SchemaAwareReadDeserializer<'_, R> { } } - fn deserialize_i64_with_schema<'de, V>( + fn deserialize_i64_with_schema( &mut self, visitor: V, schema: &Schema, @@ -365,8 +359,7 @@ impl SchemaAwareReadDeserializer<'_, R> { V: Visitor<'de>, { let create_error = |cause: &str| { - Details::SerializeValueWithSchema { - // TODO: DeserializeValueWithSchema + Details::DeserializeValueWithSchema { value_type: "i64", value: format!("Cause: {cause}"), schema: schema.clone(), @@ -376,7 +369,7 @@ impl SchemaAwareReadDeserializer<'_, R> { match schema { Schema::Int | Schema::TimeMillis | Schema::Date => { - let long = zag_i64(&mut self.reader)?; + let long = zag_i64(self.reader)?; let int = i32::try_from(long) .map_err(|cause| create_error(cause.to_string().as_str()))?; visitor.visit_i32(int) @@ -389,7 +382,7 @@ impl SchemaAwareReadDeserializer<'_, R> { | Schema::LocalTimestampMillis | Schema::LocalTimestampMicros | Schema::LocalTimestampNanos => { - let long = zag_i64(&mut self.reader)?; + let long = zag_i64(self.reader)?; visitor.visit_i64(long) } Schema::Union(union_schema) => { @@ -420,6 +413,149 @@ impl SchemaAwareReadDeserializer<'_, R> { ))), } } + + fn deserialize_struct_with_schema( + &'de mut self, + name: &'static str, + fields: &'static [&'static str], + visitor: V, + schema: &'de Schema, + ) -> Result + where + V: Visitor<'de>, + { + let create_error = |cause: &str| { + Details::DeserializeValueWithSchema { + value_type: "struct", + value: format!("Cause: {cause}"), + schema: schema.clone(), + } + .into() + }; + + match schema { + Schema::Record(record_schema) => visitor.visit_map( + RecordSchemaAwareReadDeserializer::new(self, name, fields.iter(), record_schema), + ), + Schema::Union(union_schema) => { + for variant_schema in union_schema.schemas.iter() { + match variant_schema { + Schema::Int + | Schema::TimeMillis + | Schema::Date + | Schema::Long + | Schema::TimeMicros + | Schema::TimestampMillis + | Schema::TimestampMicros + | Schema::TimestampNanos + | Schema::LocalTimestampMillis + | Schema::LocalTimestampMicros + | Schema::LocalTimestampNanos => { + return self.deserialize_i64_with_schema(visitor, variant_schema); + } + _ => { /* skip */ } + } + } + Err(create_error(&format!( + "The union schema must have a Long[-like] variant: {schema:?}" + ))) + } + unexpected => Err(create_error(&format!( + "Expected a Long[-like] schema, found: {unexpected:?}" + ))), + } + } +} + +struct RecordSchemaAwareReadDeserializer<'s, R: Read> { + deser: &'s mut SchemaAwareReadDeserializer<'s, R>, + schema_name: &'static str, + fields: Iter<'s, &'static str>, + record_schema: &'s crate::schema::RecordSchema, +} + +impl<'s, R: Read> RecordSchemaAwareReadDeserializer<'s, R> { + fn new( + deser: &'s mut SchemaAwareReadDeserializer<'s, R>, + schema_name: &'static str, + fields: Iter<'s, &'static str>, + record_schema: &'s crate::schema::RecordSchema, + ) -> Self { + Self { + deser, + schema_name, + fields, + record_schema, + } + } +} + +impl<'de, R: Read> de::MapAccess<'de> for RecordSchemaAwareReadDeserializer<'de, R> { + type Error = Error; + + fn next_key_seed(&mut self, seed: K) -> Result, Self::Error> + where + K: de::DeserializeSeed<'de>, + { + match self.fields.next() { + Some(&field_name) => seed + .deserialize(StringDeserializer { input: field_name }) + .map(Some), + None => Ok(None), + } + } + + fn next_value_seed(&mut self, seed: V) -> Result + where + V: de::DeserializeSeed<'de>, + { + match self.fields.next() { + Some(&field_name) => { + let field_idx = self.record_schema.lookup.get(field_name).ok_or_else(|| { + return Error::new(Details::DeserializeValueWithSchema { + value_type: "field", + value: format!("Field '{field_name}' not found in record schema"), + schema: Schema::Record(self.record_schema.clone()), + }); + })?; + let record_field = self.record_schema.fields.get(*field_idx).ok_or_else(|| { + return Error::new(Details::DeserializeValueWithSchema { + value_type: "field", + value: format!("Field index {field_idx} out of bounds"), + schema: Schema::Record(self.record_schema.clone()), + }); + })?; + let field_schema = &record_field.schema; + seed.deserialize(SchemaAwareReadDeserializer::new( + self.deser.reader, + field_schema, + )) + } + None => Err(de::Error::custom("should not happen - too many values")), + } + } +} + +#[derive(Clone)] +struct StringDeserializer<'de> { + input: &'de str, +} + +impl<'de> de::Deserializer<'de> for StringDeserializer<'de> { + type Error = Error; + + fn deserialize_any(self, visitor: V) -> Result + where + V: Visitor<'de>, + { + visitor.visit_str(self.input) + } + + forward_to_deserialize_any! { + bool u8 u16 u32 u64 i8 i16 i32 i64 f32 f64 char str string unit option + seq bytes byte_buf map unit_struct newtype_struct + tuple_struct struct tuple enum identifier ignored_any + } } #[cfg(test)] @@ -463,12 +599,13 @@ mod tests { let schema = Schema::Long; // Using a non-boolean schema let mut reader: &[u8] = &[0, 1, 2]; - match read_avro_datum_ref::(&schema, &mut reader) { - Err(Error::new(Details::SerializeValueWithSchema { + match read_avro_datum_ref::(&schema, &mut reader).map_err(Error::into_details) + { + Err(Details::SerializeValueWithSchema { value_type, value, schema, - })) => { + }) => { assert_eq!(value_type, "bool"); assert!(value.contains("Cause: Expected a boolean schema")); assert_eq!(schema.to_string(), schema.to_string()); @@ -484,12 +621,13 @@ mod tests { let schema = Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Long])?); let mut reader: &[u8] = &[1, 2, 3]; - match read_avro_datum_ref::(&schema, &mut reader) { - Err(Error::new(Details::SerializeValueWithSchema { + match read_avro_datum_ref::(&schema, &mut reader).map_err(Error::into_details) + { + Err(Details::SerializeValueWithSchema { value_type, value, schema, - })) => { + }) => { assert_eq!(value_type, "bool"); assert!(value.contains("The union schema must have a Boolean variant")); assert_eq!(schema.to_string(), schema.to_string()); @@ -533,7 +671,7 @@ mod tests { let schema = Schema::Long; // Using a non-Int schema let mut reader: &[u8] = &[0, 1, 2]; - match read_avro_datum_ref::(&schema, &mut reader) { + match read_avro_datum_ref::(&schema, &mut reader).map_err(Error::into_details) { Err(Details::SerializeValueWithSchema { value_type, value, @@ -557,7 +695,7 @@ mod tests { let schema = Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Long])?); let mut reader: &[u8] = &[1, 2, 3]; - match read_avro_datum_ref::(&schema, &mut reader) { + match read_avro_datum_ref::(&schema, &mut reader).map_err(Error::into_details) { Err(Details::SerializeValueWithSchema { value_type, value, @@ -623,8 +761,8 @@ mod tests { let schema = Schema::Uuid; // Using a non-Long schema let mut reader: &[u8] = &[0, 1, 2]; - match read_avro_datum_ref::(&schema, &mut reader) { - Err(Error::SerializeValueWithSchema { + match read_avro_datum_ref::(&schema, &mut reader).map_err(Error::into_details) { + Err(Details::SerializeValueWithSchema { value_type, value, schema, @@ -647,8 +785,8 @@ mod tests { let schema = Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::String])?); let mut reader: &[u8] = &[1, 2, 3]; - match read_avro_datum_ref::(&schema, &mut reader) { - Err(Error::SerializeValueWithSchema { + match read_avro_datum_ref::(&schema, &mut reader).map_err(Error::into_details) { + Err(Details::SerializeValueWithSchema { value_type, value, schema, diff --git a/avro/src/error.rs b/avro/src/error.rs index 3e4553c4..945d5ab1 100644 --- a/avro/src/error.rs +++ b/avro/src/error.rs @@ -508,6 +508,13 @@ pub enum Details { schema: Schema, }, + #[error("Failed to deserialize value of type {value_type} using schema {schema:?}: {value}")] + DeserializeValueWithSchema { + value_type: &'static str, + value: String, + schema: Schema, + }, + #[error("Failed to serialize field '{field_name}' for record {record_schema:?}: {error}")] SerializeRecordFieldWithSchema { field_name: &'static str, diff --git a/avro/src/lib.rs b/avro/src/lib.rs index 8dd3f284..f5fa905c 100644 --- a/avro/src/lib.rs +++ b/avro/src/lib.rs @@ -899,16 +899,16 @@ pub use decimal::Decimal; pub use duration::{Days, Duration, Millis, Months}; pub use error::Error; pub use reader::{ - from_avro_datum, from_avro_datum_reader_schemata, from_avro_datum_schemata, read_marker, - GenericSingleObjectReader, Reader, SpecificSingleObjectReader, + GenericSingleObjectReader, Reader, SpecificSingleObjectReader, from_avro_datum, + from_avro_datum_reader_schemata, from_avro_datum_schemata, read_avro_datum_ref, read_marker, }; pub use schema::{AvroSchema, Schema}; pub use ser::to_value; pub use util::{max_allocation_bytes, set_serde_human_readable}; pub use uuid::Uuid; pub use writer::{ - to_avro_datum, to_avro_datum_schemata, write_avro_datum_ref, GenericSingleObjectWriter, SpecificSingleObjectWriter, - Writer, WriterBuilder, + GenericSingleObjectWriter, SpecificSingleObjectWriter, Writer, WriterBuilder, to_avro_datum, + to_avro_datum_schemata, write_avro_datum_ref, }; #[cfg(feature = "derive")] @@ -920,8 +920,8 @@ pub type AvroResult = Result; #[cfg(test)] mod tests { use crate::{ - from_avro_datum, types::{Record, Value}, Codec, Reader, Schema, - Writer, + Codec, Reader, Schema, Writer, from_avro_datum, + types::{Record, Value}, }; use pretty_assertions::assert_eq; diff --git a/avro/src/reader.rs b/avro/src/reader.rs index df19b3fd..d22fca31 100644 --- a/avro/src/reader.rs +++ b/avro/src/reader.rs @@ -18,17 +18,17 @@ //! Logic handling reading from Avro format at user level. use crate::error::Details; use crate::{ - de_schema::SchemaAwareReadDeserializer, decode::{decode, decode_internal}, from_value, + AvroResult, Codec, Error, + de_schema::SchemaAwareReadDeserializer, + decode::{decode, decode_internal}, + from_value, headers::{HeaderBuilder, RabinFingerprintHeader}, schema::{ - resolve_names, resolve_names_with_schemata, AvroSchema, Names, NamesRef, ResolvedOwnedSchema, ResolvedSchema, - Schema, + AvroSchema, Names, ResolvedOwnedSchema, ResolvedSchema, Schema, resolve_names, + resolve_names_with_schemata, }, types::Value, util, - AvroResult, - Codec, - Error, }; use log::warn; use serde::de::DeserializeOwned; @@ -603,8 +603,8 @@ pub fn read_avro_datum_ref( schema: &Schema, reader: &mut R, ) -> AvroResult { - let names: NamesRef = NamesRef::default(); - let deserializer = SchemaAwareReadDeserializer::new(reader, schema, &names, None); + // let names: NamesRef = NamesRef::default(); + let deserializer = SchemaAwareReadDeserializer::new(reader, schema); D::deserialize(deserializer) } diff --git a/avro/src/ser_schema.rs b/avro/src/ser_schema.rs index 4f014ee4..e62d21d1 100644 --- a/avro/src/ser_schema.rs +++ b/avro/src/ser_schema.rs @@ -1766,7 +1766,7 @@ impl<'a, 's, W: Write> ser::Serializer for &'a mut SchemaAwareWriteSerializer<'s mod tests { use super::*; use crate::{ - decimal::Decimal, error::Details, schema::ResolvedSchema, Days, Duration, Millis, Months, + Days, Duration, Millis, Months, decimal::Decimal, error::Details, schema::ResolvedSchema, }; use apache_avro_test_helper::TestResult; use bigdecimal::BigDecimal; diff --git a/avro/tests/avro-rs-226.rs b/avro/tests/avro-rs-226.rs index 89aef93e..c35913d7 100644 --- a/avro/tests/avro-rs-226.rs +++ b/avro/tests/avro-rs-226.rs @@ -15,10 +15,11 @@ // specific language governing permissions and limitations // under the License. -use apache_avro::{AvroSchema, Schema, Writer, from_value}; +use apache_avro::{AvroSchema, Schema, Writer, read_avro_datum_ref}; use apache_avro_test_helper::TestResult; use serde::{Deserialize, Serialize, de::DeserializeOwned}; use std::fmt::Debug; +use std::io::Cursor; fn ser_deser(schema: &Schema, record: T) -> TestResult where @@ -35,13 +36,17 @@ where // let deserialized = from_value::(&value)?; // assert_eq!(deserialized, record2); - let reader = apache_avro::Reader::with_schema(schema, &bytes_written[..])?; - for value in reader { - let value = value?; - dbg!(&value); - let deserialized = from_value::(&value)?; - assert_eq!(deserialized, record2); - } + // let reader = apache_avro::Reader::with_schema(schema, &bytes_written[..])?; + // for value in reader { + // let value = value?; + // dbg!(&value); + // let deserialized = from_value::(&value)?; + // assert_eq!(deserialized, record2); + // } + + let mut reader = Cursor::new(&bytes_written); + let deserialized: T = read_avro_datum_ref(schema, &mut reader)?; + assert_eq!(deserialized, record2); Ok(()) } @@ -70,7 +75,7 @@ fn avro_rs_226_index_out_of_bounds_with_serde_skip_serializing_skip_middle_field fn avro_rs_226_index_out_of_bounds_with_serde_skip_serializing_skip_first_field() -> TestResult { #[derive(AvroSchema, Clone, Debug, Deserialize, PartialEq, Serialize)] struct T { - // #[serde(skip_serializing_if = "Option::is_none")] + #[serde(skip_serializing_if = "Option::is_none")] x: Option, y: Option, z: Option, From dd97f0b04d11e25f8bed4f0e27069fa9daf3b1ad Mon Sep 17 00:00:00 2001 From: Martin Tzvetanov Grigorov Date: Tue, 22 Jul 2025 16:55:39 +0300 Subject: [PATCH 08/10] Fight with the lifetimes Signed-off-by: Martin Tzvetanov Grigorov --- avro/src/de_schema.rs | 151 +++++++++++++++++++++++++++++++----------- 1 file changed, 113 insertions(+), 38 deletions(-) diff --git a/avro/src/de_schema.rs b/avro/src/de_schema.rs index a0a1584d..9aade3d2 100644 --- a/avro/src/de_schema.rs +++ b/avro/src/de_schema.rs @@ -8,14 +8,14 @@ use serde::{de, forward_to_deserialize_any}; use std::io::Read; use std::slice::Iter; -pub struct SchemaAwareReadDeserializer<'s, R: Read> { - reader: &'s mut R, - root_schema: &'s Schema, +pub struct SchemaAwareReadDeserializer<'a, R: Read> { + reader: &'a mut R, + root_schema: &'a Schema, } -impl<'s, R: Read> SchemaAwareReadDeserializer<'s, R> { +impl<'a, R: Read> SchemaAwareReadDeserializer<'a, R> { #[allow(dead_code)] // TODO: remove! It is actually used in reader.rs - pub(crate) fn new(reader: &'s mut R, root_schema: &'s Schema) -> Self { + pub(crate) fn new(reader: &'a mut R, root_schema: &'a Schema) -> Self { Self { reader, root_schema, @@ -23,7 +23,7 @@ impl<'s, R: Read> SchemaAwareReadDeserializer<'s, R> { } } -impl<'de, R: Read> serde::de::Deserializer<'de> for SchemaAwareReadDeserializer<'de, R> { +impl<'de: 'a, 'a, R: Read> serde::de::Deserializer<'de> for SchemaAwareReadDeserializer<'a, R> { type Error = Error; fn deserialize_any(self, _visitor: V) -> Result @@ -230,8 +230,6 @@ impl<'de, R: Read> serde::de::Deserializer<'de> for SchemaAwareReadDeserializer< where V: Visitor<'de>, { - // dbg!(name, fields, self.root_schema); - // todo!("Implement deserialization for struct"); let schema = self.root_schema; let mut this = self; this.deserialize_struct_with_schema(name, fields, visitor, schema) @@ -264,7 +262,7 @@ impl<'de, R: Read> serde::de::Deserializer<'de> for SchemaAwareReadDeserializer< } } -impl<'de, R: Read> SchemaAwareReadDeserializer<'de, R> { +impl<'de: 'a, 'a, R: Read> SchemaAwareReadDeserializer<'a, R> { fn deserialize_bool_with_schema( &mut self, visitor: V, @@ -415,11 +413,11 @@ impl<'de, R: Read> SchemaAwareReadDeserializer<'de, R> { } fn deserialize_struct_with_schema( - &'de mut self, + &mut self, name: &'static str, fields: &'static [&'static str], visitor: V, - schema: &'de Schema, + schema: &'a Schema, ) -> Result where V: Visitor<'de>, @@ -434,9 +432,14 @@ impl<'de, R: Read> SchemaAwareReadDeserializer<'de, R> { }; match schema { - Schema::Record(record_schema) => visitor.visit_map( - RecordSchemaAwareReadDeserializer::new(self, name, fields.iter(), record_schema), - ), + Schema::Record(record_schema) => { + visitor.visit_map(RecordSchemaAwareReadDeserializerStruct::new( + self, + name, + fields.iter(), + record_schema, + )) + } Schema::Union(union_schema) => { for variant_schema in union_schema.schemas.iter() { match variant_schema { @@ -467,30 +470,34 @@ impl<'de, R: Read> SchemaAwareReadDeserializer<'de, R> { } } -struct RecordSchemaAwareReadDeserializer<'s, R: Read> { - deser: &'s mut SchemaAwareReadDeserializer<'s, R>, +struct RecordSchemaAwareReadDeserializerStruct<'a, R: Read> { + deser: &'a mut SchemaAwareReadDeserializer<'a, R>, schema_name: &'static str, - fields: Iter<'s, &'static str>, - record_schema: &'s crate::schema::RecordSchema, + fields: Iter<'a, &'static str>, + current_field: Option<&'static str>, + record_schema: &'a crate::schema::RecordSchema, } -impl<'s, R: Read> RecordSchemaAwareReadDeserializer<'s, R> { +impl<'a, R: Read> RecordSchemaAwareReadDeserializerStruct<'a, R> { fn new( - deser: &'s mut SchemaAwareReadDeserializer<'s, R>, + deser: &'a mut SchemaAwareReadDeserializer<'a, R>, schema_name: &'static str, - fields: Iter<'s, &'static str>, - record_schema: &'s crate::schema::RecordSchema, + fields: Iter<'a, &'static str>, + record_schema: &'a crate::schema::RecordSchema, ) -> Self { Self { deser, schema_name, fields, + current_field: None, record_schema, } } } -impl<'de, R: Read> de::MapAccess<'de> for RecordSchemaAwareReadDeserializer<'de, R> { +impl<'de: 'a, 'a, R: Read> de::MapAccess<'de> + for RecordSchemaAwareReadDeserializerStruct<'a, R> +{ type Error = Error; fn next_key_seed(&mut self, seed: K) -> Result, Self::Error> @@ -498,9 +505,12 @@ impl<'de, R: Read> de::MapAccess<'de> for RecordSchemaAwareReadDeserializer<'de, K: de::DeserializeSeed<'de>, { match self.fields.next() { - Some(&field_name) => seed - .deserialize(StringDeserializer { input: field_name }) - .map(Some), + Some(&field_name) => { + self.current_field = Some(field_name); + seed + .deserialize(StringDeserializer { input: field_name }) + .map(Some) + }, None => Ok(None), } } @@ -509,20 +519,16 @@ impl<'de, R: Read> de::MapAccess<'de> for RecordSchemaAwareReadDeserializer<'de, where V: de::DeserializeSeed<'de>, { - match self.fields.next() { - Some(&field_name) => { - let field_idx = self.record_schema.lookup.get(field_name).ok_or_else(|| { + match self.current_field.take() { + Some(field_name) => { + let schema = self.record_schema; + let record_field = schema.lookup.get(field_name) + .and_then(|idx| schema.fields.get(*idx)) + .ok_or_else(|| { return Error::new(Details::DeserializeValueWithSchema { - value_type: "field", + value_type: "struct", value: format!("Field '{field_name}' not found in record schema"), - schema: Schema::Record(self.record_schema.clone()), - }); - })?; - let record_field = self.record_schema.fields.get(*field_idx).ok_or_else(|| { - return Error::new(Details::DeserializeValueWithSchema { - value_type: "field", - value: format!("Field index {field_idx} out of bounds"), - schema: Schema::Record(self.record_schema.clone()), + schema: Schema::Record(schema.clone()), }); })?; let field_schema = &record_field.schema; @@ -536,6 +542,75 @@ impl<'de, R: Read> de::MapAccess<'de> for RecordSchemaAwareReadDeserializer<'de, } } +// struct RecordSchemaAwareReadDeserializer<'s, R: Read> { +// deser: &'s mut SchemaAwareReadDeserializer<'s, R>, +// schema_name: &'static str, +// fields: Iter<'s, &'static str>, +// record_schema: &'s crate::schema::RecordSchema, +// } +// +// impl<'s, R: Read> RecordSchemaAwareReadDeserializer<'s, R> { +// fn new( +// deser: &'s mut SchemaAwareReadDeserializer<'s, R>, +// schema_name: &'static str, +// fields: Iter<'s, &'static str>, +// record_schema: &'s crate::schema::RecordSchema, +// ) -> Self { +// Self { +// deser, +// schema_name, +// fields, +// record_schema, +// } +// } +// } + +// impl<'de, R: Read> de::MapAccess<'de> for RecordSchemaAwareReadDeserializer<'de, R> { +// type Error = Error; +// +// fn next_key_seed(&mut self, seed: K) -> Result, Self::Error> +// where +// K: de::DeserializeSeed<'de>, +// { +// match self.fields.next() { +// Some(&field_name) => seed +// .deserialize(StringDeserializer { input: field_name }) +// .map(Some), +// None => Ok(None), +// } +// } +// +// fn next_value_seed(&mut self, seed: V) -> Result +// where +// V: de::DeserializeSeed<'de>, +// { +// match self.fields.next() { +// Some(&field_name) => { +// let field_idx = self.record_schema.lookup.get(field_name).ok_or_else(|| { +// return Error::new(Details::DeserializeValueWithSchema { +// value_type: "field", +// value: format!("Field '{field_name}' not found in record schema"), +// schema: Schema::Record(self.record_schema.clone()), +// }); +// })?; +// let record_field = self.record_schema.fields.get(*field_idx).ok_or_else(|| { +// return Error::new(Details::DeserializeValueWithSchema { +// value_type: "field", +// value: format!("Field index {field_idx} out of bounds"), +// schema: Schema::Record(self.record_schema.clone()), +// }); +// })?; +// let field_schema = &record_field.schema; +// seed.deserialize(SchemaAwareReadDeserializer::new( +// self.deser.reader, +// field_schema, +// )) +// } +// None => Err(de::Error::custom("should not happen - too many values")), +// } +// } +// } + #[derive(Clone)] struct StringDeserializer<'de> { input: &'de str, From 6ab74d6713883d562f9c982582f898172a6a89b6 Mon Sep 17 00:00:00 2001 From: Martin Tzvetanov Grigorov Date: Wed, 23 Jul 2025 10:04:16 +0300 Subject: [PATCH 09/10] Add logic to deserialize String Signed-off-by: Martin Tzvetanov Grigorov --- avro/src/de_schema.rs | 99 +++++++++++++++++++++++++++++++-------- avro/src/ser_schema.rs | 10 ++++ avro/tests/avro-rs-226.rs | 26 +++++----- 3 files changed, 104 insertions(+), 31 deletions(-) diff --git a/avro/src/de_schema.rs b/avro/src/de_schema.rs index 9aade3d2..4d47de62 100644 --- a/avro/src/de_schema.rs +++ b/avro/src/de_schema.rs @@ -131,11 +131,41 @@ impl<'de: 'a, 'a, R: Read> serde::de::Deserializer<'de> for SchemaAwareReadDeser todo!("Implement deserialization for str") } - fn deserialize_string(self, _visitor: V) -> Result - where - V: Visitor<'de>, - { - todo!("Implement deserialization for String") + fn deserialize_string(self, visitor: V) -> Result + where + V: Visitor<'de>, + { + match self.root_schema { + Schema::String => { + match zag_i64(self.reader) + .map_err(Error::into_details) { + Ok(len) => { + dbg!(len); + let mut buf = vec![0; usize::try_from(len) + .map_err(|e| Details::ConvertI64ToUsize(e, len))?]; + dbg!(&buf); + self.reader.read_exact(&mut buf).map_err(|e| { + Details::ReadBytes(e) + })?; + let string = String::from_utf8(buf) + .map_err(|e| Details::ConvertToUtf8(e))?; + visitor.visit_string(string) + } + Err(details) => { + Err(de::Error::custom(format!( + "Cannot read the length of the string schema {details:?}", + ))) + } + } + + } + not_implemented => { + Err(de::Error::custom(format!( + "Expected a String schema, but got {:?}", + not_implemented + ))) + } + } } fn deserialize_bytes(self, _visitor: V) -> Result @@ -152,11 +182,40 @@ impl<'de: 'a, 'a, R: Read> serde::de::Deserializer<'de> for SchemaAwareReadDeser todo!() } - fn deserialize_option(self, _visitor: V) -> Result + fn deserialize_option(self, visitor: V) -> Result where V: Visitor<'de>, { - todo!("Implement deserialization for Option") + match self.root_schema { + Schema::Null => visitor.visit_none(), + Schema::Union(union_schema) => { + match zag_i64(self.reader).map_err(Error::into_details) { + Ok(index) => { + let variants = union_schema.variants(); + let variant = variants + .get(usize::try_from(index).map_err(|e| Details::ConvertI64ToUsize(e, index))?) + .ok_or(Details::GetUnionVariant { + index, + num_variants: variants.len(), + })?; + dbg!(&variant); + match variant { + Schema::Null => visitor.visit_none(), + _ => visitor.visit_some( + SchemaAwareReadDeserializer::new(self.reader, variant), + ), + } + } + Err(details) => Err(de::Error::custom(format!( + "Cannot read the index of the union schema variant {details:?}", + ))), + } + } + _ => Err(de::Error::custom(format!( + "Expected a Union, but got {:?}", + self.root_schema + ))), + } } fn deserialize_unit(self, _visitor: V) -> Result @@ -413,7 +472,7 @@ impl<'de: 'a, 'a, R: Read> SchemaAwareReadDeserializer<'a, R> { } fn deserialize_struct_with_schema( - &mut self, + &'a mut self, name: &'static str, fields: &'static [&'static str], visitor: V, @@ -430,7 +489,7 @@ impl<'de: 'a, 'a, R: Read> SchemaAwareReadDeserializer<'a, R> { } .into() }; - + dbg!(name, fields); match schema { Schema::Record(record_schema) => { visitor.visit_map(RecordSchemaAwareReadDeserializerStruct::new( @@ -472,7 +531,7 @@ impl<'de: 'a, 'a, R: Read> SchemaAwareReadDeserializer<'a, R> { struct RecordSchemaAwareReadDeserializerStruct<'a, R: Read> { deser: &'a mut SchemaAwareReadDeserializer<'a, R>, - schema_name: &'static str, + _schema_name: &'static str, fields: Iter<'a, &'static str>, current_field: Option<&'static str>, record_schema: &'a crate::schema::RecordSchema, @@ -481,13 +540,13 @@ struct RecordSchemaAwareReadDeserializerStruct<'a, R: Read> { impl<'a, R: Read> RecordSchemaAwareReadDeserializerStruct<'a, R> { fn new( deser: &'a mut SchemaAwareReadDeserializer<'a, R>, - schema_name: &'static str, + _schema_name: &'static str, fields: Iter<'a, &'static str>, record_schema: &'a crate::schema::RecordSchema, ) -> Self { Self { deser, - schema_name, + _schema_name, fields, current_field: None, record_schema, @@ -525,11 +584,11 @@ impl<'de: 'a, 'a, R: Read> de::MapAccess<'de> let record_field = schema.lookup.get(field_name) .and_then(|idx| schema.fields.get(*idx)) .ok_or_else(|| { - return Error::new(Details::DeserializeValueWithSchema { + Error::new(Details::DeserializeValueWithSchema { value_type: "struct", value: format!("Field '{field_name}' not found in record schema"), schema: Schema::Record(schema.clone()), - }); + }) })?; let field_schema = &record_field.schema; seed.deserialize(SchemaAwareReadDeserializer::new( @@ -676,7 +735,7 @@ mod tests { let mut reader: &[u8] = &[0, 1, 2]; match read_avro_datum_ref::(&schema, &mut reader).map_err(Error::into_details) { - Err(Details::SerializeValueWithSchema { + Err(Details::DeserializeValueWithSchema { value_type, value, schema, @@ -698,7 +757,7 @@ mod tests { let mut reader: &[u8] = &[1, 2, 3]; match read_avro_datum_ref::(&schema, &mut reader).map_err(Error::into_details) { - Err(Details::SerializeValueWithSchema { + Err(Details::DeserializeValueWithSchema { value_type, value, schema, @@ -747,7 +806,7 @@ mod tests { let mut reader: &[u8] = &[0, 1, 2]; match read_avro_datum_ref::(&schema, &mut reader).map_err(Error::into_details) { - Err(Details::SerializeValueWithSchema { + Err(Details::DeserializeValueWithSchema { value_type, value, schema, @@ -771,7 +830,7 @@ mod tests { let mut reader: &[u8] = &[1, 2, 3]; match read_avro_datum_ref::(&schema, &mut reader).map_err(Error::into_details) { - Err(Details::SerializeValueWithSchema { + Err(Details::DeserializeValueWithSchema { value_type, value, schema, @@ -837,7 +896,7 @@ mod tests { let mut reader: &[u8] = &[0, 1, 2]; match read_avro_datum_ref::(&schema, &mut reader).map_err(Error::into_details) { - Err(Details::SerializeValueWithSchema { + Err(Details::DeserializeValueWithSchema { value_type, value, schema, @@ -861,7 +920,7 @@ mod tests { let mut reader: &[u8] = &[1, 2, 3]; match read_avro_datum_ref::(&schema, &mut reader).map_err(Error::into_details) { - Err(Details::SerializeValueWithSchema { + Err(Details::DeserializeValueWithSchema { value_type, value, schema, diff --git a/avro/src/ser_schema.rs b/avro/src/ser_schema.rs index e62d21d1..4bfd9ba5 100644 --- a/avro/src/ser_schema.rs +++ b/avro/src/ser_schema.rs @@ -344,6 +344,16 @@ impl ser::SerializeStruct for SchemaAwareWriteSerializeStruct<'_, '_, } } +fn field_matches(record_field: &RecordField, expected_name: &str) -> bool { + let field_name = record_field.name.as_str(); + match &record_field.aliases { + Some(aliases) => { + expected_name == field_name || aliases.iter().any(|a| expected_name == a.as_str()) + } + None => expected_name == field_name, + } +} + impl ser::SerializeStructVariant for SchemaAwareWriteSerializeStruct<'_, '_, W> { type Ok = usize; type Error = Error; diff --git a/avro/tests/avro-rs-226.rs b/avro/tests/avro-rs-226.rs index c35913d7..f306708a 100644 --- a/avro/tests/avro-rs-226.rs +++ b/avro/tests/avro-rs-226.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use apache_avro::{AvroSchema, Schema, Writer, read_avro_datum_ref}; +use apache_avro::{AvroSchema, Schema, read_avro_datum_ref, write_avro_datum_ref}; use apache_avro_test_helper::TestResult; use serde::{Deserialize, Serialize, de::DeserializeOwned}; use std::fmt::Debug; @@ -26,9 +26,12 @@ where T: Serialize + DeserializeOwned + Debug + PartialEq + Clone, { let record2 = record.clone(); - let mut writer = Writer::new(schema, vec![]); - writer.append_ser(record)?; - let bytes_written = writer.into_inner()?; + // let mut writer = Writer::new(schema, vec![]); + // writer.append_ser(record)?; + // let bytes_written = writer.into_inner()?; + + let mut writer = vec![]; + let _written = write_avro_datum_ref(schema, &record, &mut writer)?; // let mut bytes_written = Cursor::new(bytes_written); // let value = from_avro_datum(schema, &mut bytes_written, None)?; @@ -44,7 +47,8 @@ where // assert_eq!(deserialized, record2); // } - let mut reader = Cursor::new(&bytes_written); + // let mut reader = Cursor::new(&bytes_written); + let mut reader = Cursor::new(&writer); let deserialized: T = read_avro_datum_ref(schema, &mut reader)?; assert_eq!(deserialized, record2); @@ -52,11 +56,11 @@ where } #[test] -fn avro_rs_226_index_out_of_bounds_with_serde_skip_serializing_skip_middle_field() -> TestResult { +fn avro_rs_226_index_out_of_bounds_with_serde_skip_serializing_skip_first_field() -> TestResult { #[derive(AvroSchema, Clone, Debug, Deserialize, PartialEq, Serialize)] struct T { - x: Option, #[serde(skip_serializing_if = "Option::is_none")] + x: Option, y: Option, z: Option, } @@ -65,18 +69,18 @@ fn avro_rs_226_index_out_of_bounds_with_serde_skip_serializing_skip_middle_field &T::get_schema(), T { x: None, - y: None, - z: Some(1), + y: Some("test".to_string()), + z: Some(23), }, ) } #[test] -fn avro_rs_226_index_out_of_bounds_with_serde_skip_serializing_skip_first_field() -> TestResult { +fn avro_rs_226_index_out_of_bounds_with_serde_skip_serializing_skip_middle_field() -> TestResult { #[derive(AvroSchema, Clone, Debug, Deserialize, PartialEq, Serialize)] struct T { - #[serde(skip_serializing_if = "Option::is_none")] x: Option, + #[serde(skip_serializing_if = "Option::is_none")] y: Option, z: Option, } From e982ff0b5b6d3fa80861fd5e37aa65506e3bc903 Mon Sep 17 00:00:00 2001 From: Martin Tzvetanov Grigorov Date: Thu, 24 Jul 2025 12:21:10 +0300 Subject: [PATCH 10/10] Remove unused method Signed-off-by: Martin Tzvetanov Grigorov --- avro/src/ser_schema.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/avro/src/ser_schema.rs b/avro/src/ser_schema.rs index 4bfd9ba5..e62d21d1 100644 --- a/avro/src/ser_schema.rs +++ b/avro/src/ser_schema.rs @@ -344,16 +344,6 @@ impl ser::SerializeStruct for SchemaAwareWriteSerializeStruct<'_, '_, } } -fn field_matches(record_field: &RecordField, expected_name: &str) -> bool { - let field_name = record_field.name.as_str(); - match &record_field.aliases { - Some(aliases) => { - expected_name == field_name || aliases.iter().any(|a| expected_name == a.as_str()) - } - None => expected_name == field_name, - } -} - impl ser::SerializeStructVariant for SchemaAwareWriteSerializeStruct<'_, '_, W> { type Ok = usize; type Error = Error;