diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs index d0eb4872e442..dc66865e68ac 100644 --- a/parquet-variant/src/builder.rs +++ b/parquet-variant/src/builder.rs @@ -64,6 +64,12 @@ fn write_offset(buf: &mut Vec, value: usize, nbytes: u8) { buf.extend_from_slice(&bytes[..nbytes as usize]); } +/// Write little-endian integer to buffer at a specific position +fn write_offset_at_pos(buf: &mut [u8], start_pos: usize, value: usize, nbytes: u8) { + let bytes = value.to_le_bytes(); + buf[start_pos..start_pos + nbytes as usize].copy_from_slice(&bytes[..nbytes as usize]); +} + /// Wrapper around a `Vec` that provides methods for appending /// primitive values, variant types, and metadata. /// @@ -389,6 +395,63 @@ impl ValueBuffer { write_offset(buf, data_size, nbytes); } } + + /// Writes out the header byte for a variant object or list, from the starting position + /// of the buffer, will return the position after this write + fn append_header_start_from_buf_pos( + &mut self, + start_pos: usize, // the start position where the header will be inserted + header_byte: u8, + is_large: bool, + num_fields: usize, + ) -> usize { + let buffer = self.inner_mut(); + + // Write header at the original start position + let mut header_pos = start_pos; + + // Write header byte + buffer[header_pos] = header_byte; + header_pos += 1; + + // Write number of fields + if is_large { + buffer[header_pos..header_pos + 4].copy_from_slice(&(num_fields as u32).to_le_bytes()); + header_pos += 4; + } else { + buffer[header_pos] = num_fields as u8; + header_pos += 1; + } + + header_pos + } + + /// Writes out the offsets for an array of offsets, including the final offset (data size). + /// from the starting position of the buffer, will return the position after this write + fn append_offset_array_start_from_buf_pos( + &mut self, + start_pos: usize, + offsets: impl IntoIterator, + data_size: Option, + nbytes: u8, + ) -> usize { + let buf = self.inner_mut(); + + let mut current_pos = start_pos; + for relative_offset in offsets { + write_offset_at_pos(buf, current_pos, relative_offset, nbytes); + current_pos += nbytes as usize; + } + + // Write data_size + if let Some(data_size) = data_size { + // Write data_size at the end of the offsets + write_offset_at_pos(buf, current_pos, data_size, nbytes); + current_pos += nbytes as usize; + } + + current_pos + } } /// Builder for constructing metadata for [`Variant`] values. @@ -553,6 +616,7 @@ enum ParentState<'a> { metadata_builder: &'a mut MetadataBuilder, fields: &'a mut IndexMap, field_name: &'a str, + parent_offset_base: usize, }, } @@ -591,11 +655,58 @@ impl ParentState<'_> { metadata_builder, fields, field_name, + parent_offset_base: object_start_offset, .. } => { let field_id = metadata_builder.upsert_field_name(field_name); - fields.insert(field_id, starting_offset); + let shifted_start_offset = starting_offset - *object_start_offset; + fields.insert(field_id, shifted_start_offset); + } + } + } + + /// Return mutable references to the buffer and metadata builder that this + /// parent state is using. + fn buffer_and_metadata_builder(&mut self) -> (&mut ValueBuffer, &mut MetadataBuilder) { + match self { + ParentState::Variant { + buffer, + metadata_builder, + } + | ParentState::List { + buffer, + metadata_builder, + .. } + | ParentState::Object { + buffer, + metadata_builder, + .. + } => (buffer, metadata_builder), + } + } + + // Return the offset of the underlying buffer at the time of calling this method. + fn buffer_current_offset(&self) -> usize { + match self { + ParentState::Variant { buffer, .. } + | ParentState::Object { buffer, .. } + | ParentState::List { buffer, .. } => buffer.offset(), + } + } + + // Return the current index of the undelying metadata buffer at the time of calling this method. + fn metadata_current_offset(&self) -> usize { + match self { + ParentState::Variant { + metadata_builder, .. + } + | ParentState::Object { + metadata_builder, .. + } + | ParentState::List { + metadata_builder, .. + } => metadata_builder.metadata_buffer.len(), } } } @@ -1140,7 +1251,14 @@ impl Drop for ListBuilder<'_> { pub struct ObjectBuilder<'a> { parent_state: ParentState<'a>, fields: IndexMap, // (field_id, offset) - buffer: ValueBuffer, + /// The starting offset in the parent's buffer where this object starts + parent_value_offset_base: usize, + /// The starting offset in the parent's metadata buffer where this object starts + /// used to truncate the written fields in `drop` if the current object has not been finished + parent_metadata_offset_base: usize, + /// Whether the object has been finished, the written content of the current object + /// will be truncated in `drop` if `has_been_finished` is false + has_been_finished: bool, validate_unique_fields: bool, /// Set of duplicate fields to report for errors duplicate_fields: HashSet, @@ -1148,10 +1266,14 @@ pub struct ObjectBuilder<'a> { impl<'a> ObjectBuilder<'a> { fn new(parent_state: ParentState<'a>, validate_unique_fields: bool) -> Self { + let offset_base = parent_state.buffer_current_offset(); + let meta_offset_base = parent_state.metadata_current_offset(); Self { parent_state, fields: IndexMap::new(), - buffer: ValueBuffer::default(), + parent_value_offset_base: offset_base, + has_been_finished: false, + parent_metadata_offset_base: meta_offset_base, validate_unique_fields, duplicate_fields: HashSet::new(), } @@ -1185,19 +1307,16 @@ impl<'a> ObjectBuilder<'a> { key: &str, value: T, ) -> Result<(), ArrowError> { - // Get metadata_builder from parent state - let metadata_builder = self.parent_state.metadata_builder(); + let (buffer, metadata_builder) = self.parent_state.buffer_and_metadata_builder(); let field_id = metadata_builder.upsert_field_name(key); - let field_start = self.buffer.offset(); + let field_start = buffer.offset() - self.parent_value_offset_base; if self.fields.insert(field_id, field_start).is_some() && self.validate_unique_fields { self.duplicate_fields.insert(field_id); } - self.buffer - .try_append_variant(value.into(), metadata_builder)?; - + buffer.try_append_variant(value.into(), metadata_builder)?; Ok(()) } @@ -1232,13 +1351,18 @@ impl<'a> ObjectBuilder<'a> { // Returns validate_unique_fields because we can no longer reference self once this method returns. fn parent_state<'b>(&'b mut self, key: &'b str) -> (ParentState<'b>, bool) { + let validate_unique_fields = self.validate_unique_fields; + + let (buffer, metadata_builder) = self.parent_state.buffer_and_metadata_builder(); + let state = ParentState::Object { - buffer: &mut self.buffer, - metadata_builder: self.parent_state.metadata_builder(), + buffer, + metadata_builder, fields: &mut self.fields, field_name: key, + parent_offset_base: self.parent_value_offset_base, }; - (state, self.validate_unique_fields) + (state, validate_unique_fields) } /// Returns an object builder that can be used to append a new (nested) object to this object. @@ -1275,39 +1399,72 @@ impl<'a> ObjectBuilder<'a> { ))); } - let data_size = self.buffer.offset(); - let num_fields = self.fields.len(); - let is_large = num_fields > u8::MAX as usize; - self.fields.sort_by(|&field_a_id, _, &field_b_id, _| { - let key_a = &metadata_builder.field_name(field_a_id as usize); - let key_b = &metadata_builder.field_name(field_b_id as usize); - key_a.cmp(key_b) + let field_a_name = metadata_builder.field_name(field_a_id as usize); + let field_b_name = metadata_builder.field_name(field_b_id as usize); + field_a_name.cmp(field_b_name) }); let max_id = self.fields.iter().map(|(i, _)| *i).max().unwrap_or(0); - let id_size = int_size(max_id as usize); - let offset_size = int_size(data_size); - // Get parent's buffer let parent_buffer = self.parent_state.buffer(); - let starting_offset = parent_buffer.offset(); + let current_offset = parent_buffer.offset(); + // Current object starts from `object_start_offset` + let data_size = current_offset - self.parent_value_offset_base; + let offset_size = int_size(data_size); - // Write header - let header = object_header(is_large, id_size, offset_size); - parent_buffer.append_header(header, is_large, num_fields); + let num_fields = self.fields.len(); + let is_large = num_fields > u8::MAX as usize; - // Write field IDs (sorted order) - let ids = self.fields.keys().map(|id| *id as usize); - parent_buffer.append_offset_array(ids, None, id_size); + let header_size = 1 + // header byte + (if is_large { 4 } else { 1 }) + // num_fields + (num_fields * id_size as usize) + // field IDs + ((num_fields + 1) * offset_size as usize); // field offsets + data_size - // Write the field offset array, followed by the value bytes - let offsets = std::mem::take(&mut self.fields).into_values(); - parent_buffer.append_offset_array(offsets, Some(data_size), offset_size); - parent_buffer.append_slice(self.buffer.inner()); + let starting_offset = self.parent_value_offset_base; + + // Shift existing data to make room for the header + let buffer = parent_buffer.inner_mut(); + buffer.splice( + starting_offset..starting_offset, + std::iter::repeat_n(0u8, header_size), + ); + + // Write header at the original start position + let mut header_pos = starting_offset; + + // Write header byte + let header = object_header(is_large, id_size, offset_size); + + header_pos = self + .parent_state + .buffer() + .append_header_start_from_buf_pos(header_pos, header, is_large, num_fields); + + header_pos = self + .parent_state + .buffer() + .append_offset_array_start_from_buf_pos( + header_pos, + self.fields.keys().copied().map(|id| id as usize), + None, + id_size, + ); + + self.parent_state + .buffer() + .append_offset_array_start_from_buf_pos( + header_pos, + self.fields.values().copied(), + Some(data_size), + offset_size, + ); self.parent_state.finish(starting_offset); + // Mark that this object has been finished + self.has_been_finished = true; + Ok(()) } } @@ -1317,7 +1474,20 @@ impl<'a> ObjectBuilder<'a> { /// This is to ensure that the object is always finalized before its parent builder /// is finalized. impl Drop for ObjectBuilder<'_> { - fn drop(&mut self) {} + fn drop(&mut self) { + // Truncate the buffer if the `finish` method has not been called. + if !self.has_been_finished { + self.parent_state + .buffer() + .inner_mut() + .truncate(self.parent_value_offset_base); + + self.parent_state + .metadata_builder() + .field_names + .truncate(self.parent_metadata_offset_base); + } + } } /// Extends [`VariantBuilder`] to help building nested [`Variant`]s @@ -1951,9 +2121,20 @@ mod tests { { "a": false, "c": { - "b": "a" - } + "b": "a", + "c": { + "aa": "bb", + }, + "d": { + "cc": "dd" + } + }, "b": true, + "d": { + "e": 1, + "f": [1, true], + "g": ["tree", false], + } } */ @@ -1966,11 +2147,45 @@ mod tests { { let mut inner_object_builder = outer_object_builder.new_object("c"); inner_object_builder.insert("b", "a"); + + { + let mut inner_inner_object_builder = inner_object_builder.new_object("c"); + inner_inner_object_builder.insert("aa", "bb"); + let _ = inner_inner_object_builder.finish(); + } + + { + let mut inner_inner_object_builder = inner_object_builder.new_object("d"); + inner_inner_object_builder.insert("cc", "dd"); + let _ = inner_inner_object_builder.finish(); + } let _ = inner_object_builder.finish(); } outer_object_builder.insert("b", true); + { + let mut inner_object_builder = outer_object_builder.new_object("d"); + inner_object_builder.insert("e", 1); + { + let mut inner_list_builder = inner_object_builder.new_list("f"); + inner_list_builder.append_value(1); + inner_list_builder.append_value(true); + + inner_list_builder.finish(); + } + + { + let mut inner_list_builder = inner_object_builder.new_list("g"); + inner_list_builder.append_value("tree"); + inner_list_builder.append_value(false); + + inner_list_builder.finish(); + } + + let _ = inner_object_builder.finish(); + } + let _ = outer_object_builder.finish(); } @@ -1982,7 +2197,18 @@ mod tests { "a": false, "b": true, "c": { - "b": "a" + "b": "a", + "c": { + "aa": "bb", + }, + "d": { + "cc": "dd" + } + }, + "d": { + "e": 1, + "f": [1, true], + "g": ["tree", false], } } */ @@ -1990,7 +2216,7 @@ mod tests { let variant = Variant::try_new(&metadata, &value).unwrap(); let outer_object = variant.as_object().unwrap(); - assert_eq!(outer_object.len(), 3); + assert_eq!(outer_object.len(), 4); assert_eq!(outer_object.field_name(0).unwrap(), "a"); assert_eq!(outer_object.field(0).unwrap(), Variant::from(false)); @@ -2000,12 +2226,151 @@ mod tests { let inner_object_variant = outer_object.field(2).unwrap(); let inner_object = inner_object_variant.as_object().unwrap(); - assert_eq!(inner_object.len(), 1); + assert_eq!(inner_object.len(), 3); assert_eq!(inner_object.field_name(0).unwrap(), "b"); assert_eq!(inner_object.field(0).unwrap(), Variant::from("a")); + let inner_iner_object_variant_c = inner_object.field(1).unwrap(); + let inner_inner_object_c = inner_iner_object_variant_c.as_object().unwrap(); + assert_eq!(inner_inner_object_c.len(), 1); + assert_eq!(inner_inner_object_c.field_name(0).unwrap(), "aa"); + assert_eq!(inner_inner_object_c.field(0).unwrap(), Variant::from("bb")); + + let inner_iner_object_variant_d = inner_object.field(2).unwrap(); + let inner_inner_object_d = inner_iner_object_variant_d.as_object().unwrap(); + assert_eq!(inner_inner_object_d.len(), 1); + assert_eq!(inner_inner_object_d.field_name(0).unwrap(), "cc"); + assert_eq!(inner_inner_object_d.field(0).unwrap(), Variant::from("dd")); + assert_eq!(outer_object.field_name(1).unwrap(), "b"); assert_eq!(outer_object.field(1).unwrap(), Variant::from(true)); + + let out_object_variant_d = outer_object.field(3).unwrap(); + let out_object_d = out_object_variant_d.as_object().unwrap(); + assert_eq!(out_object_d.len(), 3); + assert_eq!("e", out_object_d.field_name(0).unwrap()); + assert_eq!(Variant::from(1), out_object_d.field(0).unwrap()); + assert_eq!("f", out_object_d.field_name(1).unwrap()); + + let first_inner_list_variant_f = out_object_d.field(1).unwrap(); + let first_inner_list_f = first_inner_list_variant_f.as_list().unwrap(); + assert_eq!(2, first_inner_list_f.len()); + assert_eq!(Variant::from(1), first_inner_list_f.get(0).unwrap()); + assert_eq!(Variant::from(true), first_inner_list_f.get(1).unwrap()); + + let second_inner_list_variant_g = out_object_d.field(2).unwrap(); + let second_inner_list_g = second_inner_list_variant_g.as_list().unwrap(); + assert_eq!(2, second_inner_list_g.len()); + assert_eq!(Variant::from("tree"), second_inner_list_g.get(0).unwrap()); + assert_eq!(Variant::from(false), second_inner_list_g.get(1).unwrap()); + } + + // This test wants to cover the logic for reuse parent buffer for list builder + // the builder looks like + // [ "apple", "false", [{"a": "b", "b": "c"}, {"c":"d", "d":"e"}], [[1, true], ["tree", false]], 1] + #[test] + fn test_nested_list_with_heterogeneous_fields_for_buffer_reuse() { + let mut builder = VariantBuilder::new(); + + { + let mut outer_list_builder = builder.new_list(); + + outer_list_builder.append_value("apple"); + outer_list_builder.append_value(false); + + { + // the list here wants to cover the logic object builder inside list builder + let mut inner_list_builder = outer_list_builder.new_list(); + + { + let mut inner_object_builder = inner_list_builder.new_object(); + inner_object_builder.insert("a", "b"); + inner_object_builder.insert("b", "c"); + let _ = inner_object_builder.finish(); + } + + { + // the seconde object builder here wants to cover the logic for + // list builder resue the parent buffer. + let mut inner_object_builder = inner_list_builder.new_object(); + inner_object_builder.insert("c", "d"); + inner_object_builder.insert("d", "e"); + let _ = inner_object_builder.finish(); + } + + inner_list_builder.finish(); + } + + { + // the list here wants to cover the logic list builder inside list builder + let mut inner_list_builder = outer_list_builder.new_list(); + + { + let mut double_inner_list_builder = inner_list_builder.new_list(); + double_inner_list_builder.append_value(1); + double_inner_list_builder.append_value(true); + + double_inner_list_builder.finish(); + } + + { + let mut double_inner_list_builder = inner_list_builder.new_list(); + double_inner_list_builder.append_value("tree"); + double_inner_list_builder.append_value(false); + + double_inner_list_builder.finish(); + } + inner_list_builder.finish(); + } + + outer_list_builder.append_value(1); + + outer_list_builder.finish(); + } + + let (metadata, value) = builder.finish(); + + let variant = Variant::try_new(&metadata, &value).unwrap(); + let outer_list = variant.as_list().unwrap(); + + assert_eq!(5, outer_list.len()); + + // Primitive value + assert_eq!(Variant::from("apple"), outer_list.get(0).unwrap()); + assert_eq!(Variant::from(false), outer_list.get(1).unwrap()); + assert_eq!(Variant::from(1), outer_list.get(4).unwrap()); + + // The first inner list [{"a": "b", "b": "c"}, {"c":"d", "d":"e"}] + let list1_variant = outer_list.get(2).unwrap(); + let list1 = list1_variant.as_list().unwrap(); + assert_eq!(2, list1.len()); + + let list1_obj1_variant = list1.get(0).unwrap(); + let list1_obj1 = list1_obj1_variant.as_object().unwrap(); + assert_eq!("a", list1_obj1.field_name(0).unwrap()); + assert_eq!(Variant::from("b"), list1_obj1.field(0).unwrap()); + + assert_eq!("b", list1_obj1.field_name(1).unwrap()); + assert_eq!(Variant::from("c"), list1_obj1.field(1).unwrap()); + + // The second inner list [[1, true], ["tree", false]] + let list2_variant = outer_list.get(3).unwrap(); + let list2 = list2_variant.as_list().unwrap(); + assert_eq!(2, list2.len()); + + // The list [1, true] + let list2_list1_variant = list2.get(0).unwrap(); + let list2_list1 = list2_list1_variant.as_list().unwrap(); + assert_eq!(2, list2_list1.len()); + assert_eq!(Variant::from(1), list2_list1.get(0).unwrap()); + assert_eq!(Variant::from(true), list2_list1.get(1).unwrap()); + + // The list ["true", false] + let list2_list2_variant = list2.get(1).unwrap(); + let list2_list2 = list2_list2_variant.as_list().unwrap(); + assert_eq!(2, list2_list2.len()); + assert_eq!(Variant::from("tree"), list2_list2.get(0).unwrap()); + assert_eq!(Variant::from(false), list2_list2.get(1).unwrap()); } #[test] @@ -2394,8 +2759,7 @@ mod tests { // The original builder should be unchanged let (metadata, value) = builder.finish(); let metadata = VariantMetadata::try_new(&metadata).unwrap(); - assert_eq!(metadata.len(), 1); - assert_eq!(&metadata[0], "name"); // not rolled back + assert!(metadata.is_empty()); // rolled back let variant = Variant::try_new_with_metadata(metadata, &value).unwrap(); assert_eq!(variant, Variant::Int8(42)); @@ -2469,8 +2833,7 @@ mod tests { list_builder.finish(); let (metadata, value) = builder.finish(); let metadata = VariantMetadata::try_new(&metadata).unwrap(); - assert_eq!(metadata.len(), 1); - assert_eq!(&metadata[0], "name"); // not rolled back + assert!(metadata.is_empty()); let variant = Variant::try_new_with_metadata(metadata, &value).unwrap(); let list = variant.as_list().unwrap(); @@ -2552,9 +2915,7 @@ mod tests { // Only the second attempt should appear in the final variant let (metadata, value) = builder.finish(); let metadata = VariantMetadata::try_new(&metadata).unwrap(); - assert_eq!(metadata.len(), 2); - assert_eq!(&metadata[0], "first"); - assert_eq!(&metadata[1], "nested"); // not rolled back + assert!(metadata.is_empty()); // rolled back let variant = Variant::try_new_with_metadata(metadata, &value).unwrap(); assert_eq!(variant, Variant::Int8(2)); @@ -2577,15 +2938,12 @@ mod tests { object_builder.finish().unwrap(); let (metadata, value) = builder.finish(); let metadata = VariantMetadata::try_new(&metadata).unwrap(); - assert_eq!(metadata.len(), 3); - assert_eq!(&metadata[0], "first"); - assert_eq!(&metadata[1], "name"); // not rolled back - assert_eq!(&metadata[2], "second"); + assert_eq!(metadata.len(), 1); // the fields of nested_object_builder has been rolled back + assert_eq!(&metadata[0], "second"); let variant = Variant::try_new_with_metadata(metadata, &value).unwrap(); let obj = variant.as_object().unwrap(); - assert_eq!(obj.len(), 2); - assert_eq!(obj.get("first"), Some(Variant::Int8(1))); + assert_eq!(obj.len(), 1); assert_eq!(obj.get("second"), Some(Variant::Int8(2))); } @@ -2608,10 +2966,7 @@ mod tests { // Only the second attempt should appear in the final variant let (metadata, value) = builder.finish(); let metadata = VariantMetadata::try_new(&metadata).unwrap(); - assert_eq!(metadata.len(), 3); - assert_eq!(&metadata[0], "first"); // not rolled back - assert_eq!(&metadata[1], "name"); // not rolled back - assert_eq!(&metadata[2], "nested"); // not rolled back + assert_eq!(metadata.len(), 0); // rolled back let variant = Variant::try_new_with_metadata(metadata, &value).unwrap(); assert_eq!(variant, Variant::Int8(2));