Skip to content

Commit b20352d

Browse files
committed
WIP
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
1 parent 15baefd commit b20352d

File tree

7 files changed

+208
-60
lines changed

7 files changed

+208
-60
lines changed

avro/src/de.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,16 @@
1616
// under the License.
1717

1818
//! Logic for serde-compatible deserialization.
19-
use crate::{bytes::DE_BYTES_BORROWED, error::Details, types::Value, AvroResult, Error};
19+
use crate::{AvroResult, Error, bytes::DE_BYTES_BORROWED, error::Details, types::Value};
2020
use serde::{
21+
Deserialize,
2122
de::{self, DeserializeSeed, Deserializer as _, Visitor},
2223
forward_to_deserialize_any,
23-
Deserialize,
2424
};
2525
use std::{
2626
collections::{
27-
hash_map::{Keys, Values},
2827
HashMap,
28+
hash_map::{Keys, Values},
2929
},
3030
slice::Iter,
3131
};

avro/src/de_schema.rs

Lines changed: 169 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,24 @@
11
use crate::error::Details;
22
use crate::{
3-
schema::{NamesRef, Namespace}, util::{zag_i32, zag_i64},
4-
Error,
5-
Schema,
3+
Error, Schema,
4+
util::{zag_i32, zag_i64},
65
};
76
use serde::de::Visitor;
7+
use serde::{de, forward_to_deserialize_any};
88
use std::io::Read;
9+
use std::slice::Iter;
910

1011
pub struct SchemaAwareReadDeserializer<'s, R: Read> {
1112
reader: &'s mut R,
1213
root_schema: &'s Schema,
13-
_names: &'s NamesRef<'s>,
14-
_enclosing_namespace: Namespace,
1514
}
1615

1716
impl<'s, R: Read> SchemaAwareReadDeserializer<'s, R> {
1817
#[allow(dead_code)] // TODO: remove! It is actually used in reader.rs
19-
pub(crate) fn new(
20-
reader: &'s mut R,
21-
root_schema: &'s Schema,
22-
_names: &'s NamesRef<'s>,
23-
_enclosing_namespace: Namespace,
24-
) -> Self {
18+
pub(crate) fn new(reader: &'s mut R, root_schema: &'s Schema) -> Self {
2519
Self {
2620
reader,
2721
root_schema,
28-
_names,
29-
_enclosing_namespace,
3022
}
3123
}
3224
}
@@ -136,14 +128,14 @@ impl<'de, R: Read> serde::de::Deserializer<'de> for SchemaAwareReadDeserializer<
136128
where
137129
V: Visitor<'de>,
138130
{
139-
todo!()
131+
todo!("Implement deserialization for str")
140132
}
141133

142134
fn deserialize_string<V>(self, _visitor: V) -> Result<V::Value, Self::Error>
143135
where
144136
V: Visitor<'de>,
145137
{
146-
todo!()
138+
todo!("Implement deserialization for String")
147139
}
148140

149141
fn deserialize_bytes<V>(self, _visitor: V) -> Result<V::Value, Self::Error>
@@ -164,7 +156,7 @@ impl<'de, R: Read> serde::de::Deserializer<'de> for SchemaAwareReadDeserializer<
164156
where
165157
V: Visitor<'de>,
166158
{
167-
todo!()
159+
todo!("Implement deserialization for Option")
168160
}
169161

170162
fn deserialize_unit<V>(self, _visitor: V) -> Result<V::Value, Self::Error>
@@ -231,14 +223,18 @@ impl<'de, R: Read> serde::de::Deserializer<'de> for SchemaAwareReadDeserializer<
231223

232224
fn deserialize_struct<V>(
233225
self,
234-
_name: &'static str,
235-
_fields: &'static [&'static str],
236-
_visitor: V,
226+
name: &'static str,
227+
fields: &'static [&'static str],
228+
visitor: V,
237229
) -> Result<V::Value, Self::Error>
238230
where
239231
V: Visitor<'de>,
240232
{
241-
todo!()
233+
// dbg!(name, fields, self.root_schema);
234+
// todo!("Implement deserialization for struct");
235+
let schema = self.root_schema;
236+
let mut this = self;
237+
this.deserialize_struct_with_schema(name, fields, visitor, schema)
242238
}
243239

244240
fn deserialize_enum<V>(
@@ -268,8 +264,8 @@ impl<'de, R: Read> serde::de::Deserializer<'de> for SchemaAwareReadDeserializer<
268264
}
269265
}
270266

271-
impl<R: Read> SchemaAwareReadDeserializer<'_, R> {
272-
fn deserialize_bool_with_schema<'de, V>(
267+
impl<'de, R: Read> SchemaAwareReadDeserializer<'de, R> {
268+
fn deserialize_bool_with_schema<V>(
273269
&mut self,
274270
visitor: V,
275271
schema: &Schema,
@@ -278,8 +274,7 @@ impl<R: Read> SchemaAwareReadDeserializer<'_, R> {
278274
V: Visitor<'de>,
279275
{
280276
let create_error = |cause: &str| {
281-
Details::SerializeValueWithSchema {
282-
// TODO: DeserializeValueWithSchema
277+
Details::DeserializeValueWithSchema {
283278
value_type: "bool",
284279
value: format!("Cause: {cause}"),
285280
schema: schema.clone(),
@@ -315,7 +310,7 @@ impl<R: Read> SchemaAwareReadDeserializer<'_, R> {
315310
}
316311
}
317312

318-
fn deserialize_i32_with_schema<'de, V>(
313+
fn deserialize_i32_with_schema<V>(
319314
&mut self,
320315
visitor: V,
321316
schema: &Schema,
@@ -324,8 +319,7 @@ impl<R: Read> SchemaAwareReadDeserializer<'_, R> {
324319
V: Visitor<'de>,
325320
{
326321
let create_error = |cause: &str| {
327-
Error::new(Details::SerializeValueWithSchema {
328-
// TODO: DeserializeValueWithSchema
322+
Error::new(Details::DeserializeValueWithSchema {
329323
value_type: "i32",
330324
value: format!("Cause: {cause}"),
331325
schema: schema.clone(),
@@ -334,7 +328,7 @@ impl<R: Read> SchemaAwareReadDeserializer<'_, R> {
334328

335329
match schema {
336330
Schema::Int | Schema::TimeMillis | Schema::Date => {
337-
let int = zag_i32(&mut self.reader)?;
331+
let int = zag_i32(self.reader)?;
338332
visitor.visit_i32(int)
339333
}
340334
Schema::Union(union_schema) => {
@@ -356,7 +350,7 @@ impl<R: Read> SchemaAwareReadDeserializer<'_, R> {
356350
}
357351
}
358352

359-
fn deserialize_i64_with_schema<'de, V>(
353+
fn deserialize_i64_with_schema<V>(
360354
&mut self,
361355
visitor: V,
362356
schema: &Schema,
@@ -365,8 +359,7 @@ impl<R: Read> SchemaAwareReadDeserializer<'_, R> {
365359
V: Visitor<'de>,
366360
{
367361
let create_error = |cause: &str| {
368-
Details::SerializeValueWithSchema {
369-
// TODO: DeserializeValueWithSchema
362+
Details::DeserializeValueWithSchema {
370363
value_type: "i64",
371364
value: format!("Cause: {cause}"),
372365
schema: schema.clone(),
@@ -376,7 +369,7 @@ impl<R: Read> SchemaAwareReadDeserializer<'_, R> {
376369

377370
match schema {
378371
Schema::Int | Schema::TimeMillis | Schema::Date => {
379-
let long = zag_i64(&mut self.reader)?;
372+
let long = zag_i64(self.reader)?;
380373
let int = i32::try_from(long)
381374
.map_err(|cause| create_error(cause.to_string().as_str()))?;
382375
visitor.visit_i32(int)
@@ -389,7 +382,7 @@ impl<R: Read> SchemaAwareReadDeserializer<'_, R> {
389382
| Schema::LocalTimestampMillis
390383
| Schema::LocalTimestampMicros
391384
| Schema::LocalTimestampNanos => {
392-
let long = zag_i64(&mut self.reader)?;
385+
let long = zag_i64(self.reader)?;
393386
visitor.visit_i64(long)
394387
}
395388
Schema::Union(union_schema) => {
@@ -420,6 +413,149 @@ impl<R: Read> SchemaAwareReadDeserializer<'_, R> {
420413
))),
421414
}
422415
}
416+
417+
fn deserialize_struct_with_schema<V>(
418+
&'de mut self,
419+
name: &'static str,
420+
fields: &'static [&'static str],
421+
visitor: V,
422+
schema: &'de Schema,
423+
) -> Result<V::Value, Error>
424+
where
425+
V: Visitor<'de>,
426+
{
427+
let create_error = |cause: &str| {
428+
Details::DeserializeValueWithSchema {
429+
value_type: "struct",
430+
value: format!("Cause: {cause}"),
431+
schema: schema.clone(),
432+
}
433+
.into()
434+
};
435+
436+
match schema {
437+
Schema::Record(record_schema) => visitor.visit_map(
438+
RecordSchemaAwareReadDeserializer::new(self, name, fields.iter(), record_schema),
439+
),
440+
Schema::Union(union_schema) => {
441+
for variant_schema in union_schema.schemas.iter() {
442+
match variant_schema {
443+
Schema::Int
444+
| Schema::TimeMillis
445+
| Schema::Date
446+
| Schema::Long
447+
| Schema::TimeMicros
448+
| Schema::TimestampMillis
449+
| Schema::TimestampMicros
450+
| Schema::TimestampNanos
451+
| Schema::LocalTimestampMillis
452+
| Schema::LocalTimestampMicros
453+
| Schema::LocalTimestampNanos => {
454+
return self.deserialize_i64_with_schema(visitor, variant_schema);
455+
}
456+
_ => { /* skip */ }
457+
}
458+
}
459+
Err(create_error(&format!(
460+
"The union schema must have a Long[-like] variant: {schema:?}"
461+
)))
462+
}
463+
unexpected => Err(create_error(&format!(
464+
"Expected a Long[-like] schema, found: {unexpected:?}"
465+
))),
466+
}
467+
}
468+
}
469+
470+
struct RecordSchemaAwareReadDeserializer<'s, R: Read> {
471+
deser: &'s mut SchemaAwareReadDeserializer<'s, R>,
472+
schema_name: &'static str,
473+
fields: Iter<'s, &'static str>,
474+
record_schema: &'s crate::schema::RecordSchema,
475+
}
476+
477+
impl<'s, R: Read> RecordSchemaAwareReadDeserializer<'s, R> {
478+
fn new(
479+
deser: &'s mut SchemaAwareReadDeserializer<'s, R>,
480+
schema_name: &'static str,
481+
fields: Iter<'s, &'static str>,
482+
record_schema: &'s crate::schema::RecordSchema,
483+
) -> Self {
484+
Self {
485+
deser,
486+
schema_name,
487+
fields,
488+
record_schema,
489+
}
490+
}
491+
}
492+
493+
impl<'de, R: Read> de::MapAccess<'de> for RecordSchemaAwareReadDeserializer<'de, R> {
494+
type Error = Error;
495+
496+
fn next_key_seed<K>(&mut self, seed: K) -> Result<Option<K::Value>, Self::Error>
497+
where
498+
K: de::DeserializeSeed<'de>,
499+
{
500+
match self.fields.next() {
501+
Some(&field_name) => seed
502+
.deserialize(StringDeserializer { input: field_name })
503+
.map(Some),
504+
None => Ok(None),
505+
}
506+
}
507+
508+
fn next_value_seed<V>(&mut self, seed: V) -> Result<V::Value, Self::Error>
509+
where
510+
V: de::DeserializeSeed<'de>,
511+
{
512+
match self.fields.next() {
513+
Some(&field_name) => {
514+
let field_idx = self.record_schema.lookup.get(field_name).ok_or_else(|| {
515+
return Error::new(Details::DeserializeValueWithSchema {
516+
value_type: "field",
517+
value: format!("Field '{field_name}' not found in record schema"),
518+
schema: Schema::Record(self.record_schema.clone()),
519+
});
520+
})?;
521+
let record_field = self.record_schema.fields.get(*field_idx).ok_or_else(|| {
522+
return Error::new(Details::DeserializeValueWithSchema {
523+
value_type: "field",
524+
value: format!("Field index {field_idx} out of bounds"),
525+
schema: Schema::Record(self.record_schema.clone()),
526+
});
527+
})?;
528+
let field_schema = &record_field.schema;
529+
seed.deserialize(SchemaAwareReadDeserializer::new(
530+
self.deser.reader,
531+
field_schema,
532+
))
533+
}
534+
None => Err(de::Error::custom("should not happen - too many values")),
535+
}
536+
}
537+
}
538+
539+
#[derive(Clone)]
540+
struct StringDeserializer<'de> {
541+
input: &'de str,
542+
}
543+
544+
impl<'de> de::Deserializer<'de> for StringDeserializer<'de> {
545+
type Error = Error;
546+
547+
fn deserialize_any<V>(self, visitor: V) -> Result<V::Value, Self::Error>
548+
where
549+
V: Visitor<'de>,
550+
{
551+
visitor.visit_str(self.input)
552+
}
553+
554+
forward_to_deserialize_any! {
555+
bool u8 u16 u32 u64 i8 i16 i32 i64 f32 f64 char str string unit option
556+
seq bytes byte_buf map unit_struct newtype_struct
557+
tuple_struct struct tuple enum identifier ignored_any
558+
}
423559
}
424560

425561
#[cfg(test)]

avro/src/error.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,13 @@ pub enum Details {
508508
schema: Schema,
509509
},
510510

511+
#[error("Failed to deserialize value of type {value_type} using schema {schema:?}: {value}")]
512+
DeserializeValueWithSchema {
513+
value_type: &'static str,
514+
value: String,
515+
schema: Schema,
516+
},
517+
511518
#[error("Failed to serialize field '{field_name}' for record {record_schema:?}: {error}")]
512519
SerializeRecordFieldWithSchema {
513520
field_name: &'static str,

avro/src/lib.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -899,16 +899,16 @@ pub use decimal::Decimal;
899899
pub use duration::{Days, Duration, Millis, Months};
900900
pub use error::Error;
901901
pub use reader::{
902-
from_avro_datum, from_avro_datum_reader_schemata, from_avro_datum_schemata, read_marker,
903-
GenericSingleObjectReader, Reader, SpecificSingleObjectReader,
902+
GenericSingleObjectReader, Reader, SpecificSingleObjectReader, from_avro_datum,
903+
from_avro_datum_reader_schemata, from_avro_datum_schemata, read_avro_datum_ref, read_marker,
904904
};
905905
pub use schema::{AvroSchema, Schema};
906906
pub use ser::to_value;
907907
pub use util::{max_allocation_bytes, set_serde_human_readable};
908908
pub use uuid::Uuid;
909909
pub use writer::{
910-
to_avro_datum, to_avro_datum_schemata, write_avro_datum_ref, GenericSingleObjectWriter, SpecificSingleObjectWriter,
911-
Writer, WriterBuilder,
910+
GenericSingleObjectWriter, SpecificSingleObjectWriter, Writer, WriterBuilder, to_avro_datum,
911+
to_avro_datum_schemata, write_avro_datum_ref,
912912
};
913913

914914
#[cfg(feature = "derive")]
@@ -920,8 +920,8 @@ pub type AvroResult<T> = Result<T, Error>;
920920
#[cfg(test)]
921921
mod tests {
922922
use crate::{
923-
from_avro_datum, types::{Record, Value}, Codec, Reader, Schema,
924-
Writer,
923+
Codec, Reader, Schema, Writer, from_avro_datum,
924+
types::{Record, Value},
925925
};
926926
use pretty_assertions::assert_eq;
927927

0 commit comments

Comments
 (0)