Skip to content

Commit 9a0590c

Browse files
committed
feat: parity in arrowstream destination types
1 parent 2209f7e commit 9a0590c

File tree

2 files changed

+303
-27
lines changed

2 files changed

+303
-27
lines changed

connectorx/src/destinations/arrowstream/arrow_assoc.rs

Lines changed: 249 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
1-
use super::errors::{ArrowDestinationError, Result};
1+
use super::{
2+
errors::{ArrowDestinationError, Result},
3+
typesystem::{DateTimeWrapperMicro, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro},
4+
};
25
use crate::constants::{DEFAULT_ARROW_DECIMAL, DEFAULT_ARROW_DECIMAL_SCALE, SECONDS_IN_DAY};
36
use crate::utils::decimal_to_i128;
47
use arrow::array::{
5-
ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, Decimal128Builder, Float32Builder,
6-
Float64Builder, Int32Builder, Int64Builder, LargeBinaryBuilder, LargeListBuilder,
7-
StringBuilder, Time64NanosecondBuilder, TimestampNanosecondBuilder, UInt32Builder,
8-
UInt64Builder,
8+
ArrayBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, Float32Builder, Float64Builder,
9+
Int16Builder, Int32Builder, Int64Builder, LargeBinaryBuilder, LargeListBuilder, StringBuilder,
10+
Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder,
11+
TimestampNanosecondBuilder, UInt16Builder, UInt32Builder, UInt64Builder,
912
};
1013
use arrow::datatypes::Field;
1114
use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
@@ -59,8 +62,10 @@ macro_rules! impl_arrow_assoc {
5962
};
6063
}
6164

65+
impl_arrow_assoc!(u16, ArrowDataType::UInt16, UInt16Builder);
6266
impl_arrow_assoc!(u32, ArrowDataType::UInt32, UInt32Builder);
6367
impl_arrow_assoc!(u64, ArrowDataType::UInt64, UInt64Builder);
68+
impl_arrow_assoc!(i16, ArrowDataType::Int16, Int16Builder);
6469
impl_arrow_assoc!(i32, ArrowDataType::Int32, Int32Builder);
6570
impl_arrow_assoc!(i64, ArrowDataType::Int64, Int64Builder);
6671
impl_arrow_assoc!(f32, ArrowDataType::Float32, Float32Builder);
@@ -230,6 +235,48 @@ impl ArrowAssoc for Option<DateTime<Utc>> {
230235
}
231236
}
232237

238+
impl ArrowAssoc for DateTimeWrapperMicro {
239+
type Builder = TimestampMicrosecondBuilder;
240+
241+
fn builder(nrows: usize) -> Self::Builder {
242+
TimestampMicrosecondBuilder::with_capacity(nrows).with_timezone("+00:00")
243+
}
244+
245+
#[throws(ArrowDestinationError)]
246+
fn append(builder: &mut Self::Builder, value: DateTimeWrapperMicro) {
247+
builder.append_value(value.0.timestamp_micros());
248+
}
249+
250+
fn field(header: &str) -> Field {
251+
Field::new(
252+
header,
253+
ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
254+
false,
255+
)
256+
}
257+
}
258+
259+
impl ArrowAssoc for Option<DateTimeWrapperMicro> {
260+
type Builder = TimestampMicrosecondBuilder;
261+
262+
fn builder(nrows: usize) -> Self::Builder {
263+
TimestampMicrosecondBuilder::with_capacity(nrows).with_timezone("+00:00")
264+
}
265+
266+
#[throws(ArrowDestinationError)]
267+
fn append(builder: &mut Self::Builder, value: Option<DateTimeWrapperMicro>) {
268+
builder.append_option(value.map(|x| x.0.timestamp_micros()));
269+
}
270+
271+
fn field(header: &str) -> Field {
272+
Field::new(
273+
header,
274+
ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
275+
true,
276+
)
277+
}
278+
}
279+
233280
fn naive_date_to_arrow(nd: NaiveDate) -> i32 {
234281
match nd.and_hms_opt(0, 0, 0) {
235282
Some(dt) => (dt.and_utc().timestamp() / SECONDS_IN_DAY) as i32,
@@ -238,7 +285,9 @@ fn naive_date_to_arrow(nd: NaiveDate) -> i32 {
238285
}
239286

240287
fn naive_datetime_to_arrow(nd: NaiveDateTime) -> i64 {
241-
nd.and_utc().timestamp_millis()
288+
nd.and_utc()
289+
.timestamp_nanos_opt()
290+
.unwrap_or_else(|| panic!("out of range DateTime"))
242291
}
243292

244293
impl ArrowAssoc for Option<NaiveDate> {
@@ -276,10 +325,10 @@ impl ArrowAssoc for NaiveDate {
276325
}
277326

278327
impl ArrowAssoc for Option<NaiveDateTime> {
279-
type Builder = Date64Builder;
328+
type Builder = TimestampNanosecondBuilder;
280329

281330
fn builder(nrows: usize) -> Self::Builder {
282-
Date64Builder::with_capacity(nrows)
331+
TimestampNanosecondBuilder::with_capacity(nrows)
283332
}
284333

285334
fn append(builder: &mut Self::Builder, value: Option<NaiveDateTime>) -> Result<()> {
@@ -288,15 +337,19 @@ impl ArrowAssoc for Option<NaiveDateTime> {
288337
}
289338

290339
fn field(header: &str) -> Field {
291-
Field::new(header, ArrowDataType::Date64, true)
340+
Field::new(
341+
header,
342+
ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
343+
true,
344+
)
292345
}
293346
}
294347

295348
impl ArrowAssoc for NaiveDateTime {
296-
type Builder = Date64Builder;
349+
type Builder = TimestampNanosecondBuilder;
297350

298351
fn builder(nrows: usize) -> Self::Builder {
299-
Date64Builder::with_capacity(nrows)
352+
TimestampNanosecondBuilder::with_capacity(nrows)
300353
}
301354

302355
fn append(builder: &mut Self::Builder, value: NaiveDateTime) -> Result<()> {
@@ -305,7 +358,56 @@ impl ArrowAssoc for NaiveDateTime {
305358
}
306359

307360
fn field(header: &str) -> Field {
308-
Field::new(header, ArrowDataType::Date64, false)
361+
Field::new(
362+
header,
363+
ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
364+
false,
365+
)
366+
}
367+
}
368+
369+
impl ArrowAssoc for Option<NaiveDateTimeWrapperMicro> {
370+
type Builder = TimestampMicrosecondBuilder;
371+
372+
fn builder(nrows: usize) -> Self::Builder {
373+
TimestampMicrosecondBuilder::with_capacity(nrows)
374+
}
375+
376+
fn append(builder: &mut Self::Builder, value: Option<NaiveDateTimeWrapperMicro>) -> Result<()> {
377+
builder.append_option(match value {
378+
Some(v) => Some(v.0.and_utc().timestamp_micros()),
379+
None => None,
380+
});
381+
Ok(())
382+
}
383+
384+
fn field(header: &str) -> Field {
385+
Field::new(
386+
header,
387+
ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
388+
true,
389+
)
390+
}
391+
}
392+
393+
impl ArrowAssoc for NaiveDateTimeWrapperMicro {
394+
type Builder = TimestampMicrosecondBuilder;
395+
396+
fn builder(nrows: usize) -> Self::Builder {
397+
TimestampMicrosecondBuilder::with_capacity(nrows)
398+
}
399+
400+
fn append(builder: &mut Self::Builder, value: NaiveDateTimeWrapperMicro) -> Result<()> {
401+
builder.append_value(value.0.and_utc().timestamp_micros());
402+
Ok(())
403+
}
404+
405+
fn field(header: &str) -> Field {
406+
Field::new(
407+
header,
408+
ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
409+
false,
410+
)
309411
}
310412
}
311413

@@ -349,6 +451,45 @@ impl ArrowAssoc for NaiveTime {
349451
}
350452
}
351453

454+
impl ArrowAssoc for Option<NaiveTimeWrapperMicro> {
455+
type Builder = Time64MicrosecondBuilder;
456+
457+
fn builder(nrows: usize) -> Self::Builder {
458+
Time64MicrosecondBuilder::with_capacity(nrows)
459+
}
460+
461+
fn append(builder: &mut Self::Builder, value: Option<NaiveTimeWrapperMicro>) -> Result<()> {
462+
builder.append_option(value.map(|t| {
463+
t.0.num_seconds_from_midnight() as i64 * 1_000_000 + (t.0.nanosecond() as i64) / 1000
464+
}));
465+
Ok(())
466+
}
467+
468+
fn field(header: &str) -> Field {
469+
Field::new(header, ArrowDataType::Time64(TimeUnit::Microsecond), true)
470+
}
471+
}
472+
473+
impl ArrowAssoc for NaiveTimeWrapperMicro {
474+
type Builder = Time64MicrosecondBuilder;
475+
476+
fn builder(nrows: usize) -> Self::Builder {
477+
Time64MicrosecondBuilder::with_capacity(nrows)
478+
}
479+
480+
fn append(builder: &mut Self::Builder, value: NaiveTimeWrapperMicro) -> Result<()> {
481+
builder.append_value(
482+
value.0.num_seconds_from_midnight() as i64 * 1_000_000
483+
+ (value.0.nanosecond() as i64) / 1000,
484+
);
485+
Ok(())
486+
}
487+
488+
fn field(header: &str) -> Field {
489+
Field::new(header, ArrowDataType::Time64(TimeUnit::Microsecond), false)
490+
}
491+
}
492+
352493
impl ArrowAssoc for Option<Vec<u8>> {
353494
type Builder = LargeBinaryBuilder;
354495

@@ -386,6 +527,93 @@ impl ArrowAssoc for Vec<u8> {
386527
}
387528
}
388529

530+
impl ArrowAssoc for Option<Vec<Option<Decimal>>> {
531+
type Builder = LargeListBuilder<Decimal128Builder>;
532+
533+
fn builder(nrows: usize) -> Self::Builder {
534+
LargeListBuilder::with_capacity(
535+
Decimal128Builder::with_capacity(nrows).with_data_type(DEFAULT_ARROW_DECIMAL),
536+
nrows,
537+
)
538+
}
539+
540+
fn append(builder: &mut Self::Builder, value: Self) -> Result<()> {
541+
match value {
542+
Some(vals) => {
543+
let mut list = vec![];
544+
545+
for val in vals {
546+
match val {
547+
Some(v) => {
548+
list.push(Some(decimal_to_i128(
549+
v,
550+
DEFAULT_ARROW_DECIMAL_SCALE as u32,
551+
)?));
552+
}
553+
None => list.push(None),
554+
}
555+
}
556+
557+
builder.append_value(list);
558+
}
559+
None => builder.append_null(),
560+
};
561+
Ok(())
562+
}
563+
564+
fn field(header: &str) -> Field {
565+
Field::new(
566+
header,
567+
ArrowDataType::LargeList(std::sync::Arc::new(Field::new_list_field(
568+
DEFAULT_ARROW_DECIMAL,
569+
true,
570+
))),
571+
true,
572+
)
573+
}
574+
}
575+
576+
impl ArrowAssoc for Vec<Option<Decimal>> {
577+
type Builder = LargeListBuilder<Decimal128Builder>;
578+
579+
fn builder(nrows: usize) -> Self::Builder {
580+
LargeListBuilder::with_capacity(
581+
Decimal128Builder::with_capacity(nrows).with_data_type(DEFAULT_ARROW_DECIMAL),
582+
nrows,
583+
)
584+
}
585+
586+
fn append(builder: &mut Self::Builder, vals: Self) -> Result<()> {
587+
let mut list = vec![];
588+
589+
for val in vals {
590+
match val {
591+
Some(v) => {
592+
list.push(Some(decimal_to_i128(
593+
v,
594+
DEFAULT_ARROW_DECIMAL_SCALE as u32,
595+
)?));
596+
}
597+
None => list.push(None),
598+
}
599+
}
600+
601+
builder.append_value(list);
602+
Ok(())
603+
}
604+
605+
fn field(header: &str) -> Field {
606+
Field::new(
607+
header,
608+
ArrowDataType::LargeList(std::sync::Arc::new(Field::new_list_field(
609+
DEFAULT_ARROW_DECIMAL,
610+
false,
611+
))),
612+
false,
613+
)
614+
}
615+
}
616+
389617
macro_rules! impl_arrow_array_assoc {
390618
($T:ty, $AT:expr, $B:ident) => {
391619
impl ArrowAssoc for $T {
@@ -432,4 +660,13 @@ macro_rules! impl_arrow_array_assoc {
432660
};
433661
}
434662

663+
impl_arrow_array_assoc!(Vec<Option<bool>>, ArrowDataType::Boolean, BooleanBuilder);
664+
impl_arrow_array_assoc!(Vec<Option<String>>, ArrowDataType::Utf8, StringBuilder);
665+
impl_arrow_array_assoc!(Vec<Option<i16>>, ArrowDataType::Int16, Int16Builder);
666+
impl_arrow_array_assoc!(Vec<Option<i32>>, ArrowDataType::Int32, Int32Builder);
667+
impl_arrow_array_assoc!(Vec<Option<i64>>, ArrowDataType::Int64, Int64Builder);
668+
impl_arrow_array_assoc!(Vec<Option<u16>>, ArrowDataType::UInt16, UInt16Builder);
669+
impl_arrow_array_assoc!(Vec<Option<u32>>, ArrowDataType::UInt32, UInt32Builder);
670+
impl_arrow_array_assoc!(Vec<Option<u64>>, ArrowDataType::UInt64, UInt64Builder);
435671
impl_arrow_array_assoc!(Vec<Option<f32>>, ArrowDataType::Float32, Float32Builder);
672+
impl_arrow_array_assoc!(Vec<Option<f64>>, ArrowDataType::Float64, Float64Builder);

0 commit comments

Comments
 (0)