Skip to content

Commit d185110

Browse files
committed
Make dict ID only an IPC concern
1 parent d534dd0 commit d185110

File tree

21 files changed

+494
-400
lines changed

21 files changed

+494
-400
lines changed

arrow-flight/src/decode.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@ impl FlightDataDecoder {
276276

277277
self.state = Some(FlightStreamState {
278278
schema: Arc::clone(&schema),
279+
schema_message: data.clone(),
279280
dictionaries_by_field,
280281
});
281282
Ok(Some(DecodedFlightData::new_schema(data, schema)))
@@ -296,10 +297,15 @@ impl FlightDataDecoder {
296297
)
297298
})?;
298299

300+
let ipc_schema = arrow_ipc::root_as_message(&state.schema_message.data_header)
301+
.unwrap()
302+
.header_as_schema()
303+
.unwrap();
304+
299305
arrow_ipc::reader::read_dictionary(
300306
&buffer,
301307
dictionary_batch,
302-
&state.schema,
308+
ipc_schema,
303309
&mut state.dictionaries_by_field,
304310
&message.version(),
305311
)
@@ -319,8 +325,14 @@ impl FlightDataDecoder {
319325
));
320326
};
321327

328+
let ipc_schema = arrow_ipc::root_as_message(&state.schema_message.data_header)
329+
.unwrap()
330+
.header_as_schema()
331+
.unwrap();
332+
322333
let batch = flight_data_to_arrow_batch(
323334
&data,
335+
ipc_schema,
324336
Arc::clone(&state.schema),
325337
&state.dictionaries_by_field,
326338
)
@@ -376,6 +388,7 @@ impl futures::Stream for FlightDataDecoder {
376388
#[derive(Debug)]
377389
struct FlightStreamState {
378390
schema: SchemaRef,
391+
schema_message: FlightData,
379392
dictionaries_by_field: HashMap<i64, ArrayRef>,
380393
}
381394

arrow-flight/src/encode.rs

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -535,15 +535,13 @@ fn prepare_field_for_flight(
535535
)
536536
.with_metadata(field.metadata().clone())
537537
} else {
538-
#[allow(deprecated)]
539-
let dict_id = dictionary_tracker.set_dict_id(field.as_ref());
540-
538+
dictionary_tracker.next_dict_id();
541539
#[allow(deprecated)]
542540
Field::new_dict(
543541
field.name(),
544542
field.data_type().clone(),
545543
field.is_nullable(),
546-
dict_id,
544+
0,
547545
field.dict_is_ordered().unwrap_or_default(),
548546
)
549547
.with_metadata(field.metadata().clone())
@@ -585,14 +583,13 @@ fn prepare_schema_for_flight(
585583
)
586584
.with_metadata(field.metadata().clone())
587585
} else {
588-
#[allow(deprecated)]
589-
let dict_id = dictionary_tracker.set_dict_id(field.as_ref());
586+
dictionary_tracker.next_dict_id();
590587
#[allow(deprecated)]
591588
Field::new_dict(
592589
field.name(),
593590
field.data_type().clone(),
594591
field.is_nullable(),
595-
dict_id,
592+
0,
596593
field.dict_is_ordered().unwrap_or_default(),
597594
)
598595
.with_metadata(field.metadata().clone())
@@ -654,16 +651,10 @@ struct FlightIpcEncoder {
654651

655652
impl FlightIpcEncoder {
656653
fn new(options: IpcWriteOptions, error_on_replacement: bool) -> Self {
657-
#[allow(deprecated)]
658-
let preserve_dict_id = options.preserve_dict_id();
659654
Self {
660655
options,
661656
data_gen: IpcDataGenerator::default(),
662-
#[allow(deprecated)]
663-
dictionary_tracker: DictionaryTracker::new_with_preserve_dict_id(
664-
error_on_replacement,
665-
preserve_dict_id,
666-
),
657+
dictionary_tracker: DictionaryTracker::new(error_on_replacement),
667658
}
668659
}
669660

@@ -1547,9 +1538,8 @@ mod tests {
15471538
async fn verify_flight_round_trip(mut batches: Vec<RecordBatch>) {
15481539
let expected_schema = batches.first().unwrap().schema();
15491540

1550-
#[allow(deprecated)]
15511541
let encoder = FlightDataEncoderBuilder::default()
1552-
.with_options(IpcWriteOptions::default().with_preserve_dict_id(false))
1542+
.with_options(IpcWriteOptions::default())
15531543
.with_dictionary_handling(DictionaryHandling::Resend)
15541544
.build(futures::stream::iter(batches.clone().into_iter().map(Ok)));
15551545

@@ -1575,8 +1565,7 @@ mod tests {
15751565
HashMap::from([("some_key".to_owned(), "some_value".to_owned())]),
15761566
);
15771567

1578-
#[allow(deprecated)]
1579-
let mut dictionary_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
1568+
let mut dictionary_tracker = DictionaryTracker::new(false);
15801569

15811570
let got = prepare_schema_for_flight(&schema, &mut dictionary_tracker, false);
15821571
assert!(got.metadata().contains_key("some_key"));
@@ -1606,9 +1595,7 @@ mod tests {
16061595
options: &IpcWriteOptions,
16071596
) -> (Vec<FlightData>, FlightData) {
16081597
let data_gen = IpcDataGenerator::default();
1609-
#[allow(deprecated)]
1610-
let mut dictionary_tracker =
1611-
DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id());
1598+
let mut dictionary_tracker = DictionaryTracker::new(false);
16121599

16131600
let (encoded_dictionaries, encoded_batch) = data_gen
16141601
.encoded_batch(batch, &mut dictionary_tracker, options)

arrow-flight/src/lib.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,7 @@ pub struct IpcMessage(pub Bytes);
149149

150150
fn flight_schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOptions) -> EncodedData {
151151
let data_gen = writer::IpcDataGenerator::default();
152-
#[allow(deprecated)]
153-
let mut dict_tracker =
154-
writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id());
152+
let mut dict_tracker = writer::DictionaryTracker::new(false);
155153
data_gen.schema_to_bytes_with_dictionary_tracker(arrow_schema, &mut dict_tracker, options)
156154
}
157155

arrow-flight/src/sql/client.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,7 @@ pub enum ArrowFlightData {
707707
pub fn arrow_data_from_flight_data(
708708
flight_data: FlightData,
709709
arrow_schema_ref: &SchemaRef,
710+
ipc_schema: arrow_ipc::Schema,
710711
) -> Result<ArrowFlightData, ArrowError> {
711712
let ipc_message = root_as_message(&flight_data.data_header[..])
712713
.map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))?;
@@ -723,6 +724,7 @@ pub fn arrow_data_from_flight_data(
723724
let record_batch = read_record_batch(
724725
&Buffer::from(flight_data.data_body),
725726
ipc_record_batch,
727+
ipc_schema,
726728
arrow_schema_ref.clone(),
727729
&dictionaries_by_field,
728730
None,

arrow-flight/src/utils.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ pub fn flight_data_to_batches(flight_data: &[FlightData]) -> Result<Vec<RecordBa
4444
let mut batches = vec![];
4545
let dictionaries_by_id = HashMap::new();
4646
for datum in flight_data[1..].iter() {
47-
let batch = flight_data_to_arrow_batch(datum, schema.clone(), &dictionaries_by_id)?;
47+
let batch =
48+
flight_data_to_arrow_batch(datum, ipc_schema, schema.clone(), &dictionaries_by_id)?;
4849
batches.push(batch);
4950
}
5051
Ok(batches)
@@ -53,6 +54,7 @@ pub fn flight_data_to_batches(flight_data: &[FlightData]) -> Result<Vec<RecordBa
5354
/// Convert `FlightData` (with supplied schema and dictionaries) to an arrow `RecordBatch`.
5455
pub fn flight_data_to_arrow_batch(
5556
data: &FlightData,
57+
ipc_schema: arrow_ipc::Schema,
5658
schema: SchemaRef,
5759
dictionaries_by_id: &HashMap<i64, ArrayRef>,
5860
) -> Result<RecordBatch, ArrowError> {
@@ -71,6 +73,7 @@ pub fn flight_data_to_arrow_batch(
7173
reader::read_record_batch(
7274
&Buffer::from(data.data_body.as_ref()),
7375
batch,
76+
ipc_schema,
7477
schema,
7578
dictionaries_by_id,
7679
None,
@@ -90,9 +93,7 @@ pub fn batches_to_flight_data(
9093
let mut flight_data = vec![];
9194

9295
let data_gen = writer::IpcDataGenerator::default();
93-
#[allow(deprecated)]
94-
let mut dictionary_tracker =
95-
writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id());
96+
let mut dictionary_tracker = writer::DictionaryTracker::new(false);
9697

9798
for batch in batches.iter() {
9899
let (encoded_dictionaries, encoded_batch) =

arrow-integration-test/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ all-features = true
3838
[dependencies]
3939
arrow = { workspace = true }
4040
arrow-buffer = { workspace = true }
41+
arrow-ipc = { workspace = true }
4142
hex = { version = "0.4", default-features = false, features = ["std"] }
4243
serde = { version = "1.0", default-features = false, features = ["rc", "derive"] }
4344
serde_json = { version = "1.0", default-features = false, features = ["std"] }

arrow-integration-test/src/field.rs

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use crate::{data_type_from_json, data_type_to_json};
1919
use arrow::datatypes::{DataType, Field};
2020
use arrow::error::{ArrowError, Result};
21+
use arrow_ipc::writer::DictionaryTracker;
2122
use std::collections::HashMap;
2223
use std::sync::Arc;
2324

@@ -218,7 +219,6 @@ pub fn field_from_json(json: &serde_json::Value) -> Result<Field> {
218219
_ => data_type,
219220
};
220221

221-
let mut dict_id = 0;
222222
let mut dict_is_ordered = false;
223223

224224
let data_type = match map.get("dictionary") {
@@ -231,14 +231,6 @@ pub fn field_from_json(json: &serde_json::Value) -> Result<Field> {
231231
));
232232
}
233233
};
234-
dict_id = match dictionary.get("id") {
235-
Some(Value::Number(n)) => n.as_i64().unwrap(),
236-
_ => {
237-
return Err(ArrowError::ParseError(
238-
"Field missing 'id' attribute".to_string(),
239-
));
240-
}
241-
};
242234
dict_is_ordered = match dictionary.get("isOrdered") {
243235
Some(&Value::Bool(n)) => n,
244236
_ => {
@@ -253,7 +245,7 @@ pub fn field_from_json(json: &serde_json::Value) -> Result<Field> {
253245
};
254246

255247
#[allow(deprecated)]
256-
let mut field = Field::new_dict(name, data_type, nullable, dict_id, dict_is_ordered);
248+
let mut field = Field::new_dict(name, data_type, nullable, 0, dict_is_ordered);
257249
field.set_metadata(metadata);
258250
Ok(field)
259251
}
@@ -264,27 +256,28 @@ pub fn field_from_json(json: &serde_json::Value) -> Result<Field> {
264256
}
265257

266258
/// Generate a JSON representation of the `Field`.
267-
pub fn field_to_json(field: &Field) -> serde_json::Value {
259+
pub fn field_to_json(dict_tracker: &mut DictionaryTracker, field: &Field) -> serde_json::Value {
268260
let children: Vec<serde_json::Value> = match field.data_type() {
269-
DataType::Struct(fields) => fields.iter().map(|x| field_to_json(x.as_ref())).collect(),
261+
DataType::Struct(fields) => fields
262+
.iter()
263+
.map(|x| field_to_json(dict_tracker, x.as_ref()))
264+
.collect(),
270265
DataType::List(field)
271266
| DataType::LargeList(field)
272267
| DataType::FixedSizeList(field, _)
273-
| DataType::Map(field, _) => vec![field_to_json(field)],
268+
| DataType::Map(field, _) => vec![field_to_json(dict_tracker, field)],
274269
_ => vec![],
275270
};
276271

277272
match field.data_type() {
278273
DataType::Dictionary(ref index_type, ref value_type) => {
279-
#[allow(deprecated)]
280-
let dict_id = field.dict_id().unwrap();
281274
serde_json::json!({
282275
"name": field.name(),
283276
"nullable": field.is_nullable(),
284277
"type": data_type_to_json(value_type),
285278
"children": children,
286279
"dictionary": {
287-
"id": dict_id,
280+
"id": dict_tracker.next_dict_id(),
288281
"indexType": data_type_to_json(index_type),
289282
"isOrdered": field.dict_is_ordered().unwrap(),
290283
}
@@ -345,7 +338,8 @@ mod tests {
345338
}"#,
346339
)
347340
.unwrap();
348-
assert_eq!(value, field_to_json(&f));
341+
let mut dictionary_tracker = DictionaryTracker::new(false);
342+
assert_eq!(value, field_to_json(&mut dictionary_tracker, &f));
349343
}
350344

351345
#[test]
@@ -398,7 +392,8 @@ mod tests {
398392
}"#,
399393
)
400394
.unwrap();
401-
assert_eq!(value, field_to_json(&f));
395+
let mut dictionary_tracker = DictionaryTracker::new(false);
396+
assert_eq!(value, field_to_json(&mut dictionary_tracker, &f));
402397
}
403398

404399
#[test]
@@ -415,7 +410,8 @@ mod tests {
415410
}"#,
416411
)
417412
.unwrap();
418-
assert_eq!(value, field_to_json(&f));
413+
let mut dictionary_tracker = DictionaryTracker::new(false);
414+
assert_eq!(value, field_to_json(&mut dictionary_tracker, &f));
419415
}
420416
#[test]
421417
fn parse_struct_from_json() {

0 commit comments

Comments
 (0)