diff --git a/connectorx/src/destinations/arrowstream/arrow_assoc.rs b/connectorx/src/destinations/arrowstream/arrow_assoc.rs index 8a5f7372c..08fe03cfb 100644 --- a/connectorx/src/destinations/arrowstream/arrow_assoc.rs +++ b/connectorx/src/destinations/arrowstream/arrow_assoc.rs @@ -1,11 +1,14 @@ -use super::errors::{ArrowDestinationError, Result}; +use super::{ + errors::{ArrowDestinationError, Result}, + typesystem::{DateTimeWrapperMicro, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro}, +}; use crate::constants::{DEFAULT_ARROW_DECIMAL, DEFAULT_ARROW_DECIMAL_SCALE, SECONDS_IN_DAY}; use crate::utils::decimal_to_i128; use arrow::array::{ - ArrayBuilder, BooleanBuilder, Date32Builder, Date64Builder, Decimal128Builder, Float32Builder, - Float64Builder, Int32Builder, Int64Builder, LargeBinaryBuilder, LargeListBuilder, - StringBuilder, Time64NanosecondBuilder, TimestampNanosecondBuilder, UInt32Builder, - UInt64Builder, + ArrayBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, Float32Builder, Float64Builder, + Int16Builder, Int32Builder, Int64Builder, LargeBinaryBuilder, LargeListBuilder, StringBuilder, + Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder, + TimestampNanosecondBuilder, UInt16Builder, UInt32Builder, UInt64Builder, }; use arrow::datatypes::Field; use arrow::datatypes::{DataType as ArrowDataType, TimeUnit}; @@ -59,8 +62,10 @@ macro_rules! impl_arrow_assoc { }; } +impl_arrow_assoc!(u16, ArrowDataType::UInt16, UInt16Builder); impl_arrow_assoc!(u32, ArrowDataType::UInt32, UInt32Builder); impl_arrow_assoc!(u64, ArrowDataType::UInt64, UInt64Builder); +impl_arrow_assoc!(i16, ArrowDataType::Int16, Int16Builder); impl_arrow_assoc!(i32, ArrowDataType::Int32, Int32Builder); impl_arrow_assoc!(i64, ArrowDataType::Int64, Int64Builder); impl_arrow_assoc!(f32, ArrowDataType::Float32, Float32Builder); @@ -230,6 +235,48 @@ impl ArrowAssoc for Option> { } } +impl ArrowAssoc for DateTimeWrapperMicro { + type Builder = TimestampMicrosecondBuilder; + + fn builder(nrows: usize) -> Self::Builder { + TimestampMicrosecondBuilder::with_capacity(nrows).with_timezone("+00:00") + } + + #[throws(ArrowDestinationError)] + fn append(builder: &mut Self::Builder, value: DateTimeWrapperMicro) { + builder.append_value(value.0.timestamp_micros()); + } + + fn field(header: &str) -> Field { + Field::new( + header, + ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + false, + ) + } +} + +impl ArrowAssoc for Option { + type Builder = TimestampMicrosecondBuilder; + + fn builder(nrows: usize) -> Self::Builder { + TimestampMicrosecondBuilder::with_capacity(nrows).with_timezone("+00:00") + } + + #[throws(ArrowDestinationError)] + fn append(builder: &mut Self::Builder, value: Option) { + builder.append_option(value.map(|x| x.0.timestamp_micros())); + } + + fn field(header: &str) -> Field { + Field::new( + header, + ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + true, + ) + } +} + fn naive_date_to_arrow(nd: NaiveDate) -> i32 { match nd.and_hms_opt(0, 0, 0) { Some(dt) => (dt.and_utc().timestamp() / SECONDS_IN_DAY) as i32, @@ -238,7 +285,9 @@ fn naive_date_to_arrow(nd: NaiveDate) -> i32 { } fn naive_datetime_to_arrow(nd: NaiveDateTime) -> i64 { - nd.and_utc().timestamp_millis() + nd.and_utc() + .timestamp_nanos_opt() + .unwrap_or_else(|| panic!("out of range DateTime")) } impl ArrowAssoc for Option { @@ -276,10 +325,10 @@ impl ArrowAssoc for NaiveDate { } impl ArrowAssoc for Option { - type Builder = Date64Builder; + type Builder = TimestampNanosecondBuilder; fn builder(nrows: usize) -> Self::Builder { - Date64Builder::with_capacity(nrows) + TimestampNanosecondBuilder::with_capacity(nrows) } fn append(builder: &mut Self::Builder, value: Option) -> Result<()> { @@ -288,15 +337,19 @@ impl ArrowAssoc for Option { } fn field(header: &str) -> Field { - Field::new(header, ArrowDataType::Date64, true) + Field::new( + header, + ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), + true, + ) } } impl ArrowAssoc for NaiveDateTime { - type Builder = Date64Builder; + type Builder = TimestampNanosecondBuilder; fn builder(nrows: usize) -> Self::Builder { - Date64Builder::with_capacity(nrows) + TimestampNanosecondBuilder::with_capacity(nrows) } fn append(builder: &mut Self::Builder, value: NaiveDateTime) -> Result<()> { @@ -305,7 +358,56 @@ impl ArrowAssoc for NaiveDateTime { } fn field(header: &str) -> Field { - Field::new(header, ArrowDataType::Date64, false) + Field::new( + header, + ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ) + } +} + +impl ArrowAssoc for Option { + type Builder = TimestampMicrosecondBuilder; + + fn builder(nrows: usize) -> Self::Builder { + TimestampMicrosecondBuilder::with_capacity(nrows) + } + + fn append(builder: &mut Self::Builder, value: Option) -> Result<()> { + builder.append_option(match value { + Some(v) => Some(v.0.and_utc().timestamp_micros()), + None => None, + }); + Ok(()) + } + + fn field(header: &str) -> Field { + Field::new( + header, + ArrowDataType::Timestamp(TimeUnit::Microsecond, None), + true, + ) + } +} + +impl ArrowAssoc for NaiveDateTimeWrapperMicro { + type Builder = TimestampMicrosecondBuilder; + + fn builder(nrows: usize) -> Self::Builder { + TimestampMicrosecondBuilder::with_capacity(nrows) + } + + fn append(builder: &mut Self::Builder, value: NaiveDateTimeWrapperMicro) -> Result<()> { + builder.append_value(value.0.and_utc().timestamp_micros()); + Ok(()) + } + + fn field(header: &str) -> Field { + Field::new( + header, + ArrowDataType::Timestamp(TimeUnit::Microsecond, None), + false, + ) } } @@ -349,6 +451,45 @@ impl ArrowAssoc for NaiveTime { } } +impl ArrowAssoc for Option { + type Builder = Time64MicrosecondBuilder; + + fn builder(nrows: usize) -> Self::Builder { + Time64MicrosecondBuilder::with_capacity(nrows) + } + + fn append(builder: &mut Self::Builder, value: Option) -> Result<()> { + builder.append_option(value.map(|t| { + t.0.num_seconds_from_midnight() as i64 * 1_000_000 + (t.0.nanosecond() as i64) / 1000 + })); + Ok(()) + } + + fn field(header: &str) -> Field { + Field::new(header, ArrowDataType::Time64(TimeUnit::Microsecond), true) + } +} + +impl ArrowAssoc for NaiveTimeWrapperMicro { + type Builder = Time64MicrosecondBuilder; + + fn builder(nrows: usize) -> Self::Builder { + Time64MicrosecondBuilder::with_capacity(nrows) + } + + fn append(builder: &mut Self::Builder, value: NaiveTimeWrapperMicro) -> Result<()> { + builder.append_value( + value.0.num_seconds_from_midnight() as i64 * 1_000_000 + + (value.0.nanosecond() as i64) / 1000, + ); + Ok(()) + } + + fn field(header: &str) -> Field { + Field::new(header, ArrowDataType::Time64(TimeUnit::Microsecond), false) + } +} + impl ArrowAssoc for Option> { type Builder = LargeBinaryBuilder; @@ -386,6 +527,93 @@ impl ArrowAssoc for Vec { } } +impl ArrowAssoc for Option>> { + type Builder = LargeListBuilder; + + fn builder(nrows: usize) -> Self::Builder { + LargeListBuilder::with_capacity( + Decimal128Builder::with_capacity(nrows).with_data_type(DEFAULT_ARROW_DECIMAL), + nrows, + ) + } + + fn append(builder: &mut Self::Builder, value: Self) -> Result<()> { + match value { + Some(vals) => { + let mut list = vec![]; + + for val in vals { + match val { + Some(v) => { + list.push(Some(decimal_to_i128( + v, + DEFAULT_ARROW_DECIMAL_SCALE as u32, + )?)); + } + None => list.push(None), + } + } + + builder.append_value(list); + } + None => builder.append_null(), + }; + Ok(()) + } + + fn field(header: &str) -> Field { + Field::new( + header, + ArrowDataType::LargeList(std::sync::Arc::new(Field::new_list_field( + DEFAULT_ARROW_DECIMAL, + true, + ))), + true, + ) + } +} + +impl ArrowAssoc for Vec> { + type Builder = LargeListBuilder; + + fn builder(nrows: usize) -> Self::Builder { + LargeListBuilder::with_capacity( + Decimal128Builder::with_capacity(nrows).with_data_type(DEFAULT_ARROW_DECIMAL), + nrows, + ) + } + + fn append(builder: &mut Self::Builder, vals: Self) -> Result<()> { + let mut list = vec![]; + + for val in vals { + match val { + Some(v) => { + list.push(Some(decimal_to_i128( + v, + DEFAULT_ARROW_DECIMAL_SCALE as u32, + )?)); + } + None => list.push(None), + } + } + + builder.append_value(list); + Ok(()) + } + + fn field(header: &str) -> Field { + Field::new( + header, + ArrowDataType::LargeList(std::sync::Arc::new(Field::new_list_field( + DEFAULT_ARROW_DECIMAL, + false, + ))), + false, + ) + } +} + macro_rules! impl_arrow_array_assoc { ($T:ty, $AT:expr, $B:ident) => { impl ArrowAssoc for $T { @@ -432,4 +660,13 @@ macro_rules! impl_arrow_array_assoc { }; } +impl_arrow_array_assoc!(Vec>, ArrowDataType::Boolean, BooleanBuilder); +impl_arrow_array_assoc!(Vec>, ArrowDataType::Utf8, StringBuilder); +impl_arrow_array_assoc!(Vec>, ArrowDataType::Int16, Int16Builder); +impl_arrow_array_assoc!(Vec>, ArrowDataType::Int32, Int32Builder); +impl_arrow_array_assoc!(Vec>, ArrowDataType::Int64, Int64Builder); +impl_arrow_array_assoc!(Vec>, ArrowDataType::UInt16, UInt16Builder); +impl_arrow_array_assoc!(Vec>, ArrowDataType::UInt32, UInt32Builder); +impl_arrow_array_assoc!(Vec>, ArrowDataType::UInt64, UInt64Builder); impl_arrow_array_assoc!(Vec>, ArrowDataType::Float32, Float32Builder); +impl_arrow_array_assoc!(Vec>, ArrowDataType::Float64, Float64Builder); diff --git a/connectorx/src/destinations/arrowstream/typesystem.rs b/connectorx/src/destinations/arrowstream/typesystem.rs index 15ae1dd72..2b190c5c6 100644 --- a/connectorx/src/destinations/arrowstream/typesystem.rs +++ b/connectorx/src/destinations/arrowstream/typesystem.rs @@ -2,10 +2,21 @@ use crate::impl_typesystem; use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc}; use rust_decimal::Decimal; +#[derive(Debug, Clone, Copy)] +pub struct DateTimeWrapperMicro(pub DateTime); + +#[derive(Debug, Clone, Copy)] +pub struct NaiveTimeWrapperMicro(pub NaiveTime); + +#[derive(Debug, Clone, Copy)] +pub struct NaiveDateTimeWrapperMicro(pub NaiveDateTime); + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] pub enum ArrowTypeSystem { + Int16(bool), Int32(bool), Int64(bool), + UInt16(bool), UInt32(bool), UInt64(bool), Float32(bool), @@ -16,28 +27,56 @@ pub enum ArrowTypeSystem { LargeBinary(bool), Date32(bool), Date64(bool), + Date64Micro(bool), Time64(bool), + Time64Micro(bool), DateTimeTz(bool), + DateTimeTzMicro(bool), + BoolArray(bool), + Utf8Array(bool), + Int16Array(bool), + Int32Array(bool), + Int64Array(bool), + UInt16Array(bool), + UInt32Array(bool), + UInt64Array(bool), Float32Array(bool), + Float64Array(bool), + DecimalArray(bool), } impl_typesystem! { system = ArrowTypeSystem, mappings = { - { Int32 => i32 } - { Int64 => i64 } - { UInt32 => u32 } - { UInt64 => u64 } - { Float64 => f64 } - { Float32 => f32 } - { Decimal => Decimal } - { Boolean => bool } - { LargeUtf8 => String } - { LargeBinary => Vec } - { Date32 => NaiveDate } - { Date64 => NaiveDateTime } - { Time64 => NaiveTime } - { DateTimeTz => DateTime } - { Float32Array => Vec> } + { Int16 => i16 } + { Int32 => i32 } + { Int64 => i64 } + { UInt16 => u16 } + { UInt32 => u32 } + { UInt64 => u64 } + { Float32 => f32 } + { Float64 => f64 } + { Decimal => Decimal } + { Boolean => bool } + { LargeUtf8 => String } + { LargeBinary => Vec } + { Date32 => NaiveDate } + { Date64 => NaiveDateTime } + { Date64Micro => NaiveDateTimeWrapperMicro } + { Time64 => NaiveTime } + { Time64Micro => NaiveTimeWrapperMicro } + { DateTimeTz => DateTime } + { DateTimeTzMicro => DateTimeWrapperMicro } + { BoolArray => Vec> } + { Utf8Array => Vec> } + { Int16Array => Vec> } + { Int32Array => Vec> } + { Int64Array => Vec> } + { UInt16Array => Vec> } + { UInt32Array => Vec> } + { UInt64Array => Vec> } + { Float32Array => Vec> } + { Float64Array => Vec> } + { DecimalArray => Vec> } } } diff --git a/connectorx/src/transports/postgres_arrowstream.rs b/connectorx/src/transports/postgres_arrowstream.rs index a9cfae3b7..2219465d2 100644 --- a/connectorx/src/transports/postgres_arrowstream.rs +++ b/connectorx/src/transports/postgres_arrowstream.rs @@ -42,31 +42,42 @@ macro_rules! impl_postgres_transport { systems = PostgresTypeSystem => ArrowTypeSystem, route = PostgresSource<$proto, $tls> => ArrowDestination, mappings = { - { Float4[f32] => Float64[f64] | conversion auto } - { Float8[f64] => Float64[f64] | conversion auto } - { Numeric[Decimal] => Decimal[Decimal] | conversion auto } - { Int2[i16] => Int64[i64] | conversion auto } - { Int4[i32] => Int64[i64] | conversion auto } - { Int8[i64] => Int64[i64] | conversion auto } - { Bool[bool] => Boolean[bool] | conversion auto } - { Text[&'r str] => LargeUtf8[String] | conversion owned } - { BpChar[&'r str] => LargeUtf8[String] | conversion none } - { VarChar[&'r str] => LargeUtf8[String] | conversion none } - { Name[&'r str] => LargeUtf8[String] | conversion none } - { Timestamp[NaiveDateTime] => Date64[NaiveDateTime] | conversion auto } - { Date[NaiveDate] => Date32[NaiveDate] | conversion auto } - { Time[NaiveTime] => Time64[NaiveTime] | conversion auto } - { TimestampTz[DateTime] => DateTimeTz[DateTime] | conversion auto } - { UUID[Uuid] => LargeUtf8[String] | conversion option } - { Char[&'r str] => LargeUtf8[String] | conversion none } - { ByteA[Vec] => LargeBinary[Vec] | conversion auto } - { JSON[Value] => LargeUtf8[String] | conversion option } - { JSONB[Value] => LargeUtf8[String] | conversion none } - { Inet[IpInet] => LargeUtf8[String] | conversion none } - { Vector[Vector] => Float32Array[Vec>] | conversion option } - { HalfVec[HalfVector] => Float32Array[Vec>] | conversion option } - { Bit[Bit] => LargeBinary[Vec] | conversion option } - { SparseVec[SparseVector] => Float32Array[Vec>] | conversion option } + { Float4[f32] => Float64[f64] | conversion auto } + { Float8[f64] => Float64[f64] | conversion auto } + { Numeric[Decimal] => Decimal[Decimal] | conversion auto } + { Int2[i16] => Int64[i64] | conversion auto } + { Int4[i32] => Int64[i64] | conversion auto } + { Int8[i64] => Int64[i64] | conversion auto } + { UInt4[u32] => UInt64[u64] | conversion auto } + { Bool[bool] => Boolean[bool] | conversion auto } + { Text[&'r str] => LargeUtf8[String] | conversion owned } + { BpChar[&'r str] => LargeUtf8[String] | conversion none } + { VarChar[&'r str] => LargeUtf8[String] | conversion none } + { Name[&'r str] => LargeUtf8[String] | conversion none } + { Enum[&'r str] => LargeUtf8[String] | conversion none } + { Timestamp[NaiveDateTime] => Date64[NaiveDateTime] | conversion auto } + { Date[NaiveDate] => Date32[NaiveDate] | conversion auto } + { Time[NaiveTime] => Time64[NaiveTime] | conversion auto } + { TimestampTz[DateTime] => DateTimeTz[DateTime] | conversion auto } + { UUID[Uuid] => LargeUtf8[String] | conversion option } + { Char[&'r str] => LargeUtf8[String] | conversion none } + { ByteA[Vec] => LargeBinary[Vec] | conversion auto } + { JSON[Value] => LargeUtf8[String] | conversion option } + { JSONB[Value] => LargeUtf8[String] | conversion none } + { Inet[IpInet] => LargeUtf8[String] | conversion none } + { BoolArray[Vec>] => BoolArray[Vec>] | conversion auto } + { VarcharArray[Vec>] => Utf8Array[Vec>] | conversion auto } + { TextArray[Vec>] => Utf8Array[Vec>] | conversion none } + { Int2Array[Vec>] => Int16Array[Vec>] | conversion auto } + { Int4Array[Vec>] => Int32Array[Vec>] | conversion auto } + { Int8Array[Vec>] => Int64Array[Vec>] | conversion auto } + { Float4Array[Vec>] => Float32Array[Vec>] | conversion auto } + { Float8Array[Vec>] => Float64Array[Vec>] | conversion auto } + { NumericArray[Vec>] => DecimalArray[Vec>] | conversion auto } + { Vector[Vector] => Float32Array[Vec>] | conversion option } + { HalfVec[HalfVector] => Float32Array[Vec>] | conversion option } + { Bit[Bit] => LargeBinary[Vec] | conversion option } + { SparseVec[SparseVector] => Float32Array[Vec>] | conversion option } } ); }