Skip to content

Commit c76e612

Browse files
committed
address comments from viirya
1 parent bdf1f2d commit c76e612

File tree

1 file changed

+117
-65
lines changed

1 file changed

+117
-65
lines changed

parquet-variant/src/builder.rs

Lines changed: 117 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,12 @@ fn write_offset(buf: &mut Vec<u8>, value: usize, nbytes: u8) {
6363
buf.extend_from_slice(&bytes[..nbytes as usize]);
6464
}
6565

66+
/// Write little-endian integer to buffer at a specific position
67+
fn write_offset_at_pos(buf: &mut [u8], start_pos: usize, value: usize, nbytes: u8) {
68+
let bytes = value.to_le_bytes();
69+
buf[start_pos..start_pos + nbytes as usize].copy_from_slice(&bytes[..nbytes as usize]);
70+
}
71+
6672
/// Wrapper around a `Vec<u8>` that provides methods for appending
6773
/// primitive values, variant types, and metadata.
6874
///
@@ -342,6 +348,63 @@ impl ValueBuffer {
342348
write_offset(buf, data_size, nbytes);
343349
}
344350
}
351+
352+
/// Writes out the header byte for a variant object or list, from the starting position
353+
/// of the buffer, will return the position after this write
354+
fn append_header_start_from_buf_pos(
355+
&mut self,
356+
start_pos: usize, // the start position where the header will be inserted
357+
header_byte: u8,
358+
is_large: bool,
359+
num_fields: usize,
360+
) -> usize {
361+
let buffer = self.inner_mut();
362+
363+
// Write header at the original start position
364+
let mut header_pos = start_pos;
365+
366+
// Write header byte
367+
buffer[header_pos] = header_byte;
368+
header_pos += 1;
369+
370+
// Write number of fields
371+
if is_large {
372+
buffer[header_pos..header_pos + 4].copy_from_slice(&(num_fields as u32).to_le_bytes());
373+
header_pos += 4;
374+
} else {
375+
buffer[header_pos] = num_fields as u8;
376+
header_pos += 1;
377+
}
378+
379+
header_pos
380+
}
381+
382+
/// Writes out the offsets for an array of offsets, including the final offset (data size).
383+
/// from the starting position of the buffer, will return the position after this write
384+
fn append_offset_array_start_from_buf_pos(
385+
&mut self,
386+
start_pos: usize,
387+
offsets: impl IntoIterator<Item = usize>,
388+
data_size: Option<usize>,
389+
nbytes: u8,
390+
) -> usize {
391+
let buf = self.inner_mut();
392+
393+
let mut current_pos = start_pos;
394+
for relative_offset in offsets {
395+
write_offset_at_pos(buf, current_pos, relative_offset, nbytes);
396+
current_pos += nbytes as usize;
397+
}
398+
399+
// Write data_size
400+
if let Some(data_size) = data_size {
401+
// Write data_size at the end of the offsets
402+
write_offset_at_pos(buf, current_pos, data_size, nbytes);
403+
current_pos += nbytes as usize;
404+
}
405+
406+
current_pos
407+
}
345408
}
346409

347410
/// Builder for constructing metadata for [`Variant`] values.
@@ -506,7 +569,7 @@ enum ParentState<'a> {
506569
metadata_builder: &'a mut MetadataBuilder,
507570
fields: &'a mut IndexMap<u32, usize>,
508571
field_name: &'a str,
509-
object_start_offset: usize,
572+
parent_offset_base: usize,
510573
},
511574
}
512575

@@ -545,7 +608,7 @@ impl ParentState<'_> {
545608
metadata_builder,
546609
fields,
547610
field_name,
548-
object_start_offset,
611+
parent_offset_base: object_start_offset,
549612
..
550613
} => {
551614
let field_id = metadata_builder.upsert_field_name(field_name);
@@ -576,7 +639,7 @@ impl ParentState<'_> {
576639
}
577640
}
578641

579-
// return the offset of the underlying buffer at the time of calling this method.
642+
// Return the offset of the underlying buffer at the time of calling this method.
580643
fn buffer_current_offset(&self) -> usize {
581644
match self {
582645
ParentState::Variant { buffer, .. }
@@ -585,7 +648,7 @@ impl ParentState<'_> {
585648
}
586649
}
587650

588-
// return the current index of the undelying metadata buffer at the time of calling this method.
651+
// Return the current index of the undelying metadata buffer at the time of calling this method.
589652
fn metadata_current_offset(&self) -> usize {
590653
match self {
591654
ParentState::Variant {
@@ -1048,8 +1111,6 @@ impl<'a> ListBuilder<'a> {
10481111

10491112
// Get parent's buffer
10501113
let parent_buffer = self.parent_state.buffer();
1051-
// as object builder has been reused the parent buffer,
1052-
// we need to shift the offset by the starting offset of the parent object
10531114
let starting_offset = parent_buffer.offset();
10541115

10551116
// Write header
@@ -1078,12 +1139,12 @@ impl Drop for ListBuilder<'_> {
10781139
pub struct ObjectBuilder<'a> {
10791140
parent_state: ParentState<'a>,
10801141
fields: IndexMap<u32, usize>, // (field_id, offset)
1081-
/// the starting offset in the parent's buffer where this object starts
1082-
object_start_offset: usize,
1083-
/// the starting offset in the parent's metadata buffer where this object starts
1142+
/// The starting offset in the parent's buffer where this object starts
1143+
parent_offset_base: usize,
1144+
/// The starting offset in the parent's metadata buffer where this object starts
10841145
/// used to truncate the written fields in `drop` if the current object has not been finished
1085-
object_meta_start_offset: usize,
1086-
/// whether the object has been finished, the written content of the current object
1146+
parent_metadata_offset_base: usize,
1147+
/// Whether the object has been finished, the written content of the current object
10871148
/// will be truncated in `drop` if `has_been_finished` is false
10881149
has_been_finished: bool,
10891150
validate_unique_fields: bool,
@@ -1093,14 +1154,14 @@ pub struct ObjectBuilder<'a> {
10931154

10941155
impl<'a> ObjectBuilder<'a> {
10951156
fn new(parent_state: ParentState<'a>, validate_unique_fields: bool) -> Self {
1096-
let start_offset = parent_state.buffer_current_offset();
1097-
let meta_start_offset = parent_state.metadata_current_offset();
1157+
let offset_base = parent_state.buffer_current_offset();
1158+
let meta_offset_base = parent_state.metadata_current_offset();
10981159
Self {
10991160
parent_state,
11001161
fields: IndexMap::new(),
1101-
object_start_offset: start_offset,
1162+
parent_offset_base: offset_base,
11021163
has_been_finished: false,
1103-
object_meta_start_offset: meta_start_offset,
1164+
parent_metadata_offset_base: meta_offset_base,
11041165
validate_unique_fields,
11051166
duplicate_fields: HashSet::new(),
11061167
}
@@ -1128,7 +1189,7 @@ impl<'a> ObjectBuilder<'a> {
11281189
let (buffer, metadata_builder) = self.parent_state.buffer_and_metadata_builder();
11291190

11301191
let field_id = metadata_builder.upsert_field_name(key);
1131-
let field_start = buffer.offset() - self.object_start_offset;
1192+
let field_start = buffer.offset() - self.parent_offset_base;
11321193

11331194
if self.fields.insert(field_id, field_start).is_some() && self.validate_unique_fields {
11341195
self.duplicate_fields.insert(field_id);
@@ -1158,7 +1219,7 @@ impl<'a> ObjectBuilder<'a> {
11581219
metadata_builder,
11591220
fields: &mut self.fields,
11601221
field_name: key,
1161-
object_start_offset: self.object_start_offset,
1222+
parent_offset_base: self.parent_offset_base,
11621223
};
11631224
(state, validate_unique_fields)
11641225
}
@@ -1207,14 +1268,14 @@ impl<'a> ObjectBuilder<'a> {
12071268

12081269
// the length of the metadata's field names is a very cheap to compute the upper bound.
12091270
// it will almost always be a tight upper bound as well -- it would take a pretty
1210-
// carefully crafted object to use only the early field ids of a large dictionary.
1271+
// carefully crafted object to use only the early field ids of a large dictionary.
12111272
let max_id = metadata_builder.field_names.len();
12121273
let id_size = int_size(max_id);
12131274

12141275
let parent_buffer = self.parent_state.buffer();
12151276
let current_offset = parent_buffer.offset();
1216-
// current object starts from `object_start_offset`
1217-
let data_size = current_offset - self.object_start_offset;
1277+
// Current object starts from `object_start_offset`
1278+
let data_size = current_offset - self.parent_offset_base;
12181279
let offset_size = int_size(data_size);
12191280

12201281
let num_fields = self.fields.len();
@@ -1225,7 +1286,7 @@ impl<'a> ObjectBuilder<'a> {
12251286
(num_fields * id_size as usize) + // field IDs
12261287
((num_fields + 1) * offset_size as usize); // field offsets + data_size
12271288

1228-
let starting_offset = self.object_start_offset;
1289+
let starting_offset = self.parent_offset_base;
12291290

12301291
// Shift existing data to make room for the header
12311292
let buffer = parent_buffer.inner_mut();
@@ -1239,42 +1300,33 @@ impl<'a> ObjectBuilder<'a> {
12391300

12401301
// Write header byte
12411302
let header = object_header(is_large, id_size, offset_size);
1242-
buffer[header_pos] = header;
1243-
header_pos += 1;
1244-
1245-
// Write number of fields
1246-
if is_large {
1247-
buffer[header_pos..header_pos + 4].copy_from_slice(&(num_fields as u32).to_le_bytes());
1248-
header_pos += 4;
1249-
} else {
1250-
buffer[header_pos] = num_fields as u8;
1251-
header_pos += 1;
1252-
}
1253-
1254-
// Write field IDs
1255-
for field_id in self.fields.keys() {
1256-
let id_bytes = field_id.to_le_bytes();
1257-
buffer[header_pos..header_pos + id_size as usize]
1258-
.copy_from_slice(&id_bytes[..id_size as usize]);
1259-
header_pos += id_size as usize;
1260-
}
1261-
1262-
// Write field offsets (adjusted for header)
1263-
for relative_offset in self.fields.values() {
1264-
let offset_bytes = relative_offset.to_le_bytes();
1265-
buffer[header_pos..header_pos + offset_size as usize]
1266-
.copy_from_slice(&offset_bytes[..offset_size as usize]);
1267-
header_pos += offset_size as usize;
1268-
}
1269-
1270-
// Write data_size
1271-
let data_size_bytes = data_size.to_le_bytes();
1272-
buffer[header_pos..header_pos + offset_size as usize]
1273-
.copy_from_slice(&data_size_bytes[..offset_size as usize]);
12741303

1304+
header_pos = self
1305+
.parent_state
1306+
.buffer()
1307+
.append_header_start_from_buf_pos(header_pos, header, is_large, num_fields);
1308+
1309+
header_pos = self
1310+
.parent_state
1311+
.buffer()
1312+
.append_offset_array_start_from_buf_pos(
1313+
header_pos,
1314+
self.fields.keys().copied().map(|id| id as usize),
1315+
None,
1316+
id_size,
1317+
);
1318+
1319+
self.parent_state
1320+
.buffer()
1321+
.append_offset_array_start_from_buf_pos(
1322+
header_pos,
1323+
self.fields.values().copied(),
1324+
Some(data_size),
1325+
offset_size,
1326+
);
12751327
self.parent_state.finish(starting_offset);
12761328

1277-
// mark that this object has been finished
1329+
// Mark that this object has been finished
12781330
self.has_been_finished = true;
12791331

12801332
Ok(())
@@ -1287,17 +1339,17 @@ impl<'a> ObjectBuilder<'a> {
12871339
/// is finalized.
12881340
impl Drop for ObjectBuilder<'_> {
12891341
fn drop(&mut self) {
1290-
// truncate the buffer if the `finish` method has not been called.
1342+
// Truncate the buffer if the `finish` method has not been called.
12911343
if !self.has_been_finished {
12921344
self.parent_state
12931345
.buffer()
12941346
.inner_mut()
1295-
.truncate(self.object_start_offset);
1347+
.truncate(self.parent_offset_base);
12961348

12971349
self.parent_state
12981350
.metadata_builder()
12991351
.field_names
1300-
.truncate(self.object_meta_start_offset);
1352+
.truncate(self.parent_metadata_offset_base);
13011353
}
13021354
}
13031355
}
@@ -2078,7 +2130,7 @@ mod tests {
20782130
assert_eq!(Variant::from(false), second_inner_list_g.get(1).unwrap());
20792131
}
20802132

2081-
// this test wants to cover the logic for reuse parent buffer for list builder
2133+
// This test wants to cover the logic for reuse parent buffer for list builder
20822134
// the builder looks like
20832135
// [ "apple", "false", [{"a": "b", "b": "c"}, {"c":"d", "d":"e"}], [[1, true], ["tree", false]], 1]
20842136
#[test]
@@ -2148,12 +2200,12 @@ mod tests {
21482200

21492201
assert_eq!(5, outer_list.len());
21502202

2151-
// primitive value
2203+
// Primitive value
21522204
assert_eq!(Variant::from("apple"), outer_list.get(0).unwrap());
21532205
assert_eq!(Variant::from(false), outer_list.get(1).unwrap());
21542206
assert_eq!(Variant::from(1), outer_list.get(4).unwrap());
21552207

2156-
// the first inner list [{"a": "b", "b": "c"}, {"c":"d", "d":"e"}]
2208+
// The first inner list [{"a": "b", "b": "c"}, {"c":"d", "d":"e"}]
21572209
let list1_variant = outer_list.get(2).unwrap();
21582210
let list1 = list1_variant.as_list().unwrap();
21592211
assert_eq!(2, list1.len());
@@ -2166,19 +2218,19 @@ mod tests {
21662218
assert_eq!("b", list1_obj1.field_name(1).unwrap());
21672219
assert_eq!(Variant::from("c"), list1_obj1.field(1).unwrap());
21682220

2169-
// the second inner list [[1, true], ["tree", false]]
2221+
// The second inner list [[1, true], ["tree", false]]
21702222
let list2_variant = outer_list.get(3).unwrap();
21712223
let list2 = list2_variant.as_list().unwrap();
21722224
assert_eq!(2, list2.len());
21732225

2174-
// the list [1, true]
2226+
// The list [1, true]
21752227
let list2_list1_variant = list2.get(0).unwrap();
21762228
let list2_list1 = list2_list1_variant.as_list().unwrap();
21772229
assert_eq!(2, list2_list1.len());
21782230
assert_eq!(Variant::from(1), list2_list1.get(0).unwrap());
21792231
assert_eq!(Variant::from(true), list2_list1.get(1).unwrap());
21802232

2181-
// the list ["true", false]
2233+
// The list ["true", false]
21822234
let list2_list2_variant = list2.get(1).unwrap();
21832235
let list2_list2 = list2_list2_variant.as_list().unwrap();
21842236
assert_eq!(2, list2_list2.len());
@@ -2673,8 +2725,8 @@ mod tests {
26732725
// Only the second attempt should appear in the final variant
26742726
let (metadata, value) = builder.finish();
26752727
let metadata = VariantMetadata::try_new(&metadata).unwrap();
2676-
assert_eq!(metadata.len(), 1); // rolled back
2677-
assert_eq!(&metadata[0], "name");
2728+
assert_eq!(metadata.len(), 1);
2729+
assert_eq!(&metadata[0], "name"); // not rolled back
26782730

26792731
let variant = Variant::try_new_with_metadata(metadata, &value).unwrap();
26802732
assert_eq!(variant, Variant::Int8(2));

0 commit comments

Comments
 (0)