18
18
//! Module for transforming a batch of JSON strings into a batch of Variants represented as
19
19
//! STRUCT<metadata: BINARY, value: BINARY>
20
20
21
- use std:: sync:: Arc ;
22
-
23
- use arrow:: array:: { Array , ArrayRef , BinaryArray , BooleanBufferBuilder , StringArray , StructArray } ;
24
- use arrow:: buffer:: { Buffer , NullBuffer , OffsetBuffer , ScalarBuffer } ;
25
- use arrow:: datatypes:: { DataType , Field } ;
21
+ use crate :: { VariantArray , VariantArrayBuilder } ;
22
+ use arrow:: array:: { Array , ArrayRef , StringArray } ;
26
23
use arrow_schema:: ArrowError ;
27
24
use parquet_variant:: VariantBuilder ;
28
25
use parquet_variant_json:: json_to_variant;
29
26
30
- fn variant_arrow_repr ( ) -> DataType {
31
- // The subfields are expected to be non-nullable according to the parquet variant spec.
32
- let metadata_field = Field :: new ( "metadata" , DataType :: Binary , false ) ;
33
- let value_field = Field :: new ( "value" , DataType :: Binary , false ) ;
34
- let fields = vec ! [ metadata_field, value_field] ;
35
- DataType :: Struct ( fields. into ( ) )
36
- }
37
-
38
27
/// Parse a batch of JSON strings into a batch of Variants represented as
39
28
/// STRUCT<metadata: BINARY, value: BINARY> where nulls are preserved. The JSON strings in the input
40
29
/// must be valid.
41
- pub fn batch_json_string_to_variant ( input : & ArrayRef ) -> Result < StructArray , ArrowError > {
30
+ pub fn batch_json_string_to_variant ( input : & ArrayRef ) -> Result < VariantArray , ArrowError > {
42
31
let input_string_array = match input. as_any ( ) . downcast_ref :: < StringArray > ( ) {
43
32
Some ( string_array) => Ok ( string_array) ,
44
33
None => Err ( ArrowError :: CastError (
45
34
"Expected reference to StringArray as input" . into ( ) ,
46
35
) ) ,
47
36
} ?;
48
37
49
- // Zero-copy builders
50
- let mut metadata_buffer: Vec < u8 > = Vec :: with_capacity ( input. len ( ) * 128 ) ;
51
- let mut metadata_offsets: Vec < i32 > = Vec :: with_capacity ( input. len ( ) + 1 ) ;
52
- let mut metadata_validity = BooleanBufferBuilder :: new ( input. len ( ) ) ;
53
- let mut metadata_current_offset: i32 = 0 ;
54
- metadata_offsets. push ( metadata_current_offset) ;
55
-
56
- let mut value_buffer: Vec < u8 > = Vec :: with_capacity ( input. len ( ) * 128 ) ;
57
- let mut value_offsets: Vec < i32 > = Vec :: with_capacity ( input. len ( ) + 1 ) ;
58
- let mut value_validity = BooleanBufferBuilder :: new ( input. len ( ) ) ;
59
- let mut value_current_offset: i32 = 0 ;
60
- value_offsets. push ( value_current_offset) ;
61
-
62
- let mut validity = BooleanBufferBuilder :: new ( input. len ( ) ) ;
38
+ let mut variant_array_builder = VariantArrayBuilder :: new ( input_string_array. len ( ) ) ;
63
39
for i in 0 ..input. len ( ) {
64
40
if input. is_null ( i) {
65
41
// The subfields are expected to be non-nullable according to the parquet variant spec.
66
- metadata_validity. append ( true ) ;
67
- value_validity. append ( true ) ;
68
- metadata_offsets. push ( metadata_current_offset) ;
69
- value_offsets. push ( value_current_offset) ;
70
- validity. append ( false ) ;
42
+ variant_array_builder. append_null ( ) ;
71
43
} else {
72
44
let mut vb = VariantBuilder :: new ( ) ;
73
45
json_to_variant ( input_string_array. value ( i) , & mut vb) ?;
74
46
let ( metadata, value) = vb. finish ( ) ;
75
- validity. append ( true ) ;
76
-
77
- metadata_current_offset += metadata. len ( ) as i32 ;
78
- metadata_buffer. extend ( metadata) ;
79
- metadata_offsets. push ( metadata_current_offset) ;
80
- metadata_validity. append ( true ) ;
81
-
82
- value_current_offset += value. len ( ) as i32 ;
83
- value_buffer. extend ( value) ;
84
- value_offsets. push ( value_current_offset) ;
85
- value_validity. append ( true ) ;
47
+ variant_array_builder. append_variant_buffers ( & metadata, & value) ;
86
48
}
87
49
}
88
- let metadata_offsets_buffer = OffsetBuffer :: new ( ScalarBuffer :: from ( metadata_offsets) ) ;
89
- let metadata_data_buffer = Buffer :: from_vec ( metadata_buffer) ;
90
- let metadata_null_buffer = NullBuffer :: new ( metadata_validity. finish ( ) ) ;
91
-
92
- let value_offsets_buffer = OffsetBuffer :: new ( ScalarBuffer :: from ( value_offsets) ) ;
93
- let value_data_buffer = Buffer :: from_vec ( value_buffer) ;
94
- let value_null_buffer = NullBuffer :: new ( value_validity. finish ( ) ) ;
95
-
96
- let metadata_array = BinaryArray :: new (
97
- metadata_offsets_buffer,
98
- metadata_data_buffer,
99
- Some ( metadata_null_buffer) ,
100
- ) ;
101
- let value_array = BinaryArray :: new (
102
- value_offsets_buffer,
103
- value_data_buffer,
104
- Some ( value_null_buffer) ,
105
- ) ;
106
-
107
- let struct_fields: Vec < ArrayRef > = vec ! [ Arc :: new( metadata_array) , Arc :: new( value_array) ] ;
108
- let variant_fields = match variant_arrow_repr ( ) {
109
- DataType :: Struct ( fields) => fields,
110
- _ => unreachable ! ( "variant_arrow_repr is hard-coded and must match the expected schema" ) ,
111
- } ;
112
- let null_buffer = NullBuffer :: new ( validity. finish ( ) ) ;
113
- Ok ( StructArray :: new (
114
- variant_fields,
115
- struct_fields,
116
- Some ( null_buffer) ,
117
- ) )
50
+ Ok ( variant_array_builder. build ( ) )
118
51
}
119
52
120
53
#[ cfg( test) ]
121
54
mod test {
122
55
use crate :: batch_json_string_to_variant;
123
- use arrow:: array:: { Array , ArrayRef , BinaryArray , StringArray } ;
56
+ use arrow:: array:: { Array , ArrayRef , AsArray , StringArray } ;
124
57
use arrow_schema:: ArrowError ;
125
58
use parquet_variant:: { Variant , VariantBuilder } ;
126
59
use std:: sync:: Arc ;
@@ -135,43 +68,38 @@ mod test {
135
68
None ,
136
69
] ) ;
137
70
let array_ref: ArrayRef = Arc :: new ( input) ;
138
- let output = batch_json_string_to_variant ( & array_ref) . unwrap ( ) ;
71
+ let variant_array = batch_json_string_to_variant ( & array_ref) . unwrap ( ) ;
139
72
140
- let struct_array = & output;
141
- let metadata_array = struct_array
142
- . column ( 0 )
143
- . as_any ( )
144
- . downcast_ref :: < BinaryArray > ( )
145
- . unwrap ( ) ;
146
- let value_array = struct_array
147
- . column ( 1 )
148
- . as_any ( )
149
- . downcast_ref :: < BinaryArray > ( )
150
- . unwrap ( ) ;
73
+ let metadata_array = variant_array. metadata_field ( ) . as_binary_view ( ) ;
74
+ let value_array = variant_array. value_field ( ) . as_binary_view ( ) ;
151
75
152
- assert ! ( !struct_array. is_null( 0 ) ) ;
153
- assert ! ( struct_array. is_null( 1 ) ) ;
154
- assert ! ( !struct_array. is_null( 2 ) ) ;
155
- assert ! ( !struct_array. is_null( 3 ) ) ;
156
- assert ! ( struct_array. is_null( 4 ) ) ;
76
+ // Compare row 0
77
+ assert ! ( !variant_array. is_null( 0 ) ) ;
78
+ assert_eq ! ( variant_array. value( 0 ) , Variant :: Int8 ( 1 ) ) ;
157
79
158
- assert_eq ! ( metadata_array . value ( 0 ) , & [ 1 , 0 , 0 ] ) ;
159
- assert_eq ! ( value_array . value ( 0 ) , & [ 12 , 1 ] ) ;
80
+ // Compare row 1
81
+ assert ! ( variant_array . is_null ( 1 ) ) ;
160
82
83
+ // Compare row 2
84
+ assert ! ( !variant_array. is_null( 2 ) ) ;
161
85
{
162
86
let mut vb = VariantBuilder :: new ( ) ;
163
87
let mut ob = vb. new_object ( ) ;
164
88
ob. insert ( "a" , Variant :: Int8 ( 32 ) ) ;
165
89
ob. finish ( ) ?;
166
90
let ( object_metadata, object_value) = vb. finish ( ) ;
167
- assert_eq ! ( metadata_array . value ( 2 ) , & object_metadata ) ;
168
- assert_eq ! ( value_array . value( 2 ) , & object_value ) ;
91
+ let expected = Variant :: new ( & object_metadata , & object_value ) ;
92
+ assert_eq ! ( variant_array . value( 2 ) , expected ) ;
169
93
}
170
94
171
- assert_eq ! ( metadata_array. value( 3 ) , & [ 1 , 0 , 0 ] ) ;
172
- assert_eq ! ( value_array. value( 3 ) , & [ 0 ] ) ;
95
+ // Compare row 3 (Note this is a variant NULL, not a null row)
96
+ assert ! ( !variant_array. is_null( 3 ) ) ;
97
+ assert_eq ! ( variant_array. value( 3 ) , Variant :: Null ) ;
98
+
99
+ // Compare row 4
100
+ assert ! ( variant_array. is_null( 4 ) ) ;
173
101
174
- // Ensure that the subfields are not actually nullable
102
+ // Ensure that the subfields are not nullable
175
103
assert ! ( !metadata_array. is_null( 1 ) ) ;
176
104
assert ! ( !value_array. is_null( 1 ) ) ;
177
105
assert ! ( !metadata_array. is_null( 4 ) ) ;
0 commit comments