diff --git a/Cargo.lock b/Cargo.lock index f8a4741..5383608 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -192,7 +192,7 @@ name = "api-snowflake-rest" version = "0.1.0" dependencies = [ "api-snowflake-rest-sessions", - "arrow 56.2.0", + "arrow 57.0.0", "axum 0.8.6", "base64 0.22.1", "catalog-metastore", @@ -311,6 +311,27 @@ dependencies = [ "arrow-string 56.2.0", ] +[[package]] +name = "arrow" +version = "57.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4df8bb5b0bd64c0b9bc61317fcc480bad0f00e56d3bc32c69a4c8dada4786bae" +dependencies = [ + "arrow-arith 57.0.0", + "arrow-array 57.0.0", + "arrow-buffer 57.0.0", + "arrow-cast 57.0.0", + "arrow-csv 57.0.0", + "arrow-data 57.0.0", + "arrow-ipc 57.0.0", + "arrow-json 57.0.0", + "arrow-ord 57.0.0", + "arrow-row 57.0.0", + "arrow-schema 57.0.0", + "arrow-select 57.0.0", + "arrow-string 57.0.0", +] + [[package]] name = "arrow-arith" version = "55.2.0" @@ -339,6 +360,20 @@ dependencies = [ "num", ] +[[package]] +name = "arrow-arith" +version = "57.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1a640186d3bd30a24cb42264c2dafb30e236a6f50d510e56d40b708c9582491" +dependencies = [ + "arrow-array 57.0.0", + "arrow-buffer 57.0.0", + "arrow-data 57.0.0", + "arrow-schema 57.0.0", + "chrono", + "num-traits", +] + [[package]] name = "arrow-array" version = "55.2.0" @@ -373,6 +408,24 @@ dependencies = [ "num", ] +[[package]] +name = "arrow-array" +version = "57.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "219fe420e6800979744c8393b687afb0252b3f8a89b91027d27887b72aa36d31" +dependencies = [ + "ahash 0.8.12", + "arrow-buffer 57.0.0", + "arrow-data 57.0.0", + "arrow-schema 57.0.0", + "chrono", + "half", + "hashbrown 0.16.0", + "num-complex", + "num-integer", + "num-traits", +] + [[package]] name = "arrow-buffer" version = "55.2.0" @@ -395,6 +448,18 @@ dependencies = [ "num", ] +[[package]] +name = "arrow-buffer" +version = "57.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76885a2697a7edf6b59577f568b456afc94ce0e2edc15b784ce3685b6c3c5c27" +dependencies = [ + "bytes", + "half", + "num-bigint", + "num-traits", +] + [[package]] name = "arrow-cast" version = "55.2.0" @@ -436,6 +501,26 @@ dependencies = [ "ryu", ] +[[package]] +name = "arrow-cast" +version = "57.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c9ebb4c987e6b3b236fb4a14b20b34835abfdd80acead3ccf1f9bf399e1f168" +dependencies = [ + "arrow-array 57.0.0", + "arrow-buffer 57.0.0", + "arrow-data 57.0.0", + "arrow-schema 57.0.0", + "arrow-select 57.0.0", + "atoi", + "base64 0.22.1", + "chrono", + "half", + "lexical-core 1.0.6", + "num-traits", + "ryu", +] + [[package]] name = "arrow-csv" version = "55.2.0" @@ -466,6 +551,21 @@ dependencies = [ "regex", ] +[[package]] +name = "arrow-csv" +version = "57.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92386159c8d4bce96f8bd396b0642a0d544d471bdc2ef34d631aec80db40a09c" +dependencies = [ + "arrow-array 57.0.0", + "arrow-cast 57.0.0", + "arrow-schema 57.0.0", + "chrono", + "csv", + "csv-core", + "regex", +] + [[package]] name = "arrow-data" version = "55.2.0" @@ -490,6 +590,19 @@ dependencies = [ "num", ] +[[package]] +name = "arrow-data" +version = "57.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "727681b95de313b600eddc2a37e736dcb21980a40f640314dcf360e2f36bc89b" +dependencies = [ + "arrow-buffer 57.0.0", + "arrow-schema 57.0.0", + "half", + "num-integer", + "num-traits", +] + [[package]] name = "arrow-ipc" version = "55.2.0" @@ -519,6 +632,20 @@ dependencies = [ "zstd", ] +[[package]] +name = "arrow-ipc" +version = "57.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da9ba92e3de170295c98a84e5af22e2b037f0c7b32449445e6c493b5fca27f27" +dependencies = [ + "arrow-array 57.0.0", + "arrow-buffer 57.0.0", + "arrow-data 57.0.0", + "arrow-schema 57.0.0", + "arrow-select 57.0.0", + "flatbuffers", +] + [[package]] name = "arrow-json" version = "55.2.0" @@ -563,6 +690,30 @@ dependencies = [ "simdutf8", ] +[[package]] +name = "arrow-json" +version = "57.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b969b4a421ae83828591c6bf5450bd52e6d489584142845ad6a861f42fe35df8" +dependencies = [ + "arrow-array 57.0.0", + "arrow-buffer 57.0.0", + "arrow-cast 57.0.0", + "arrow-data 57.0.0", + "arrow-schema 57.0.0", + "chrono", + "half", + "indexmap 2.12.0", + "itoa", + "lexical-core 1.0.6", + "memchr", + "num-traits", + "ryu", + "serde_core", + "serde_json", + "simdutf8", +] + [[package]] name = "arrow-ord" version = "55.2.0" @@ -589,6 +740,19 @@ dependencies = [ "arrow-select 56.2.0", ] +[[package]] +name = "arrow-ord" +version = "57.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "141c05298b21d03e88062317a1f1a73f5ba7b6eb041b350015b1cd6aabc0519b" +dependencies = [ + "arrow-array 57.0.0", + "arrow-buffer 57.0.0", + "arrow-data 57.0.0", + "arrow-schema 57.0.0", + "arrow-select 57.0.0", +] + [[package]] name = "arrow-row" version = "55.2.0" @@ -615,6 +779,19 @@ dependencies = [ "half", ] +[[package]] +name = "arrow-row" +version = "57.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5f3c06a6abad6164508ed283c7a02151515cef3de4b4ff2cebbcaeb85533db2" +dependencies = [ + "arrow-array 57.0.0", + "arrow-buffer 57.0.0", + "arrow-data 57.0.0", + "arrow-schema 57.0.0", + "half", +] + [[package]] name = "arrow-schema" version = "55.2.0" @@ -636,6 +813,12 @@ dependencies = [ "serde_json", ] +[[package]] +name = "arrow-schema" +version = "57.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cfa7a03d1eee2a4d061476e1840ad5c9867a544ca6c4c59256496af5d0a8be5" + [[package]] name = "arrow-select" version = "55.2.0" @@ -664,6 +847,20 @@ dependencies = [ "num", ] +[[package]] +name = "arrow-select" +version = "57.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bafa595babaad59f2455f4957d0f26448fb472722c186739f4fac0823a1bdb47" +dependencies = [ + "ahash 0.8.12", + "arrow-array 57.0.0", + "arrow-buffer 57.0.0", + "arrow-data 57.0.0", + "arrow-schema 57.0.0", + "num-traits", +] + [[package]] name = "arrow-string" version = "55.2.0" @@ -698,6 +895,23 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "arrow-string" +version = "57.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32f46457dbbb99f2650ff3ac23e46a929e0ab81db809b02aa5511c258348bef2" +dependencies = [ + "arrow-array 57.0.0", + "arrow-buffer 57.0.0", + "arrow-data 57.0.0", + "arrow-schema 57.0.0", + "arrow-select 57.0.0", + "memchr", + "num-traits", + "regex", + "regex-syntax", +] + [[package]] name = "async-compression" version = "0.4.19" diff --git a/crates/api-snowflake-rest/Cargo.toml b/crates/api-snowflake-rest/Cargo.toml index 785102e..17e6055 100644 --- a/crates/api-snowflake-rest/Cargo.toml +++ b/crates/api-snowflake-rest/Cargo.toml @@ -56,7 +56,7 @@ cfg-if = { workspace = true } insta = { workspace = true } reqwest = { workspace = true } http = { workspace = true } -arrow = "56.2.0" +arrow = "57.0.0" [lints] workspace = true diff --git a/crates/api-snowflake-rest/src/models.rs b/crates/api-snowflake-rest/src/models.rs index 2b0e0e6..4ab83f8 100644 --- a/crates/api-snowflake-rest/src/models.rs +++ b/crates/api-snowflake-rest/src/models.rs @@ -1,15 +1,11 @@ cfg_if::cfg_if! { if #[cfg(feature = "default-server")] { use executor::models::ColumnInfo as ColumnInfoModel; - use executor::Row; - } else { - // Define simple representation for Row - // As with external-server we won't have any dependences (especially trivial) - type Row = Vec; } } use serde::{Deserialize, Serialize}; +use serde_json::value::RawValue; use std::collections::HashMap; use uuid::Uuid; @@ -98,7 +94,7 @@ pub struct ResponseData { #[serde(rename = "rowsetBase64")] pub row_set_base_64: Option, #[serde(rename = "rowset")] - pub row_set: Option>, + pub row_set: Option>, pub total: Option, #[serde(rename = "queryResultFormat")] pub query_result_format: Option, diff --git a/crates/api-snowflake-rest/src/server/error.rs b/crates/api-snowflake-rest/src/server/error.rs index ea823a8..9941141 100644 --- a/crates/api-snowflake-rest/src/server/error.rs +++ b/crates/api-snowflake-rest/src/server/error.rs @@ -60,6 +60,14 @@ pub enum Error { location: Location, }, + #[snafu(display("SerdeJson error: {error}"))] + SerdeJson { + #[snafu(source)] + error: serde_json::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(transparent)] Execution { source: executor::Error }, } @@ -172,7 +180,10 @@ impl Error { SqlState::Success, ErrorCode::Other, ), - Self::Utf8 { .. } | Self::Arrow { .. } | Self::NotImplemented { .. } => { + Self::Utf8 { .. } + | Self::Arrow { .. } + | Self::SerdeJson { .. } + | Self::NotImplemented { .. } => { (http::StatusCode::OK, SqlState::Success, ErrorCode::Other) } }; diff --git a/crates/api-snowflake-rest/src/server/handlers.rs b/crates/api-snowflake-rest/src/server/handlers.rs index 406a59b..f8dc3ba 100644 --- a/crates/api-snowflake-rest/src/server/handlers.rs +++ b/crates/api-snowflake-rest/src/server/handlers.rs @@ -62,18 +62,18 @@ pub async fn query( let serialization_format = state.config.dbt_serialization_format; let query_context = QueryContext::default() .with_ip_address(addr.ip().to_string()) - .with_async_query(async_exec) .with_request_id(query.request_id); if async_exec { return api_snowflake_rest_error::NotImplementedSnafu.fail(); } + let query_uuid = query_context.query_id.as_uuid(); let result = state .execution_svc .query(&session_id, &sql_text, query_context) .await?; - handle_query_ok_result(&sql_text, result, serialization_format) + handle_query_ok_result(&sql_text, query_uuid, result, serialization_format) } #[tracing::instrument(name = "api_snowflake_rest::abort", level = "debug", skip(state), err, ret(level = tracing::Level::TRACE))] diff --git a/crates/api-snowflake-rest/src/server/helpers.rs b/crates/api-snowflake-rest/src/server/helpers.rs index 5e60952..6c42974 100644 --- a/crates/api-snowflake-rest/src/server/helpers.rs +++ b/crates/api-snowflake-rest/src/server/helpers.rs @@ -7,9 +7,13 @@ use base64::engine::general_purpose::STANDARD as engine_base64; use base64::prelude::*; use datafusion::arrow::ipc::MetadataVersion; use datafusion::arrow::ipc::writer::{IpcWriteOptions, StreamWriter}; +use datafusion::arrow::json::{StructMode, WriterBuilder, writer::JsonArray}; use datafusion::arrow::record_batch::RecordBatch; use executor::models::QueryResult; -use executor::utils::{DataSerializationFormat, convert_record_batches}; +use executor::utils::{ + DataSerializationFormat, convert_record_batches, convert_struct_to_timestamp, +}; +use serde_json::value::RawValue; use snafu::ResultExt; use uuid::Uuid; @@ -21,7 +25,7 @@ use uuid::Uuid; // For more info see issue #115 const ARROW_IPC_ALIGNMENT: usize = 8; -fn records_to_arrow_string(recs: &Vec) -> std::result::Result { +fn records_to_arrow_string(recs: &[RecordBatch]) -> std::result::Result { let mut buf = Vec::new(); let options = IpcWriteOptions::try_new(ARROW_IPC_ALIGNMENT, false, MetadataVersion::V5) .context(api_snowflake_rest_error::ArrowSnafu)?; @@ -42,13 +46,36 @@ fn records_to_arrow_string(recs: &Vec) -> std::result::Result std::result::Result { + let recs = recs.iter().collect::>(); + + let buf = Vec::new(); + let mut writer = WriterBuilder::new() + .with_struct_mode(StructMode::ListOnly) + .with_explicit_nulls(true) + .build::<_, JsonArray>(buf); + + writer + .write_batches(&recs) + .context(api_snowflake_rest_error::ArrowSnafu)?; + writer + .finish() + .context(api_snowflake_rest_error::ArrowSnafu)?; + let buf = writer.into_inner(); + // it is expected to be cheap, as no allocations just reuses underlying buffer + String::from_utf8(buf).context(api_snowflake_rest_error::Utf8Snafu) +} + #[tracing::instrument(name = "handle_query_ok_result", level = "debug", err, ret(level = tracing::Level::TRACE))] pub fn handle_query_ok_result( sql_text: &str, + query_uuid: Uuid, query_result: QueryResult, ser_fmt: DataSerializationFormat, ) -> Result> { - let query_uuid: Uuid = query_result.query_id.as_uuid(); + // Convert the QueryResult to RecordBatches using the specified serialization format + // Add columns dbt metadata to each field + let records = convert_record_batches(&query_result, ser_fmt)?; let json_resp = Json(JsonResponse { data: Option::from(ResponseData { @@ -59,13 +86,17 @@ pub fn handle_query_ok_result( .collect(), query_result_format: Some(ser_fmt.to_string().to_lowercase()), row_set: if ser_fmt == DataSerializationFormat::Json { - Option::from(query_result.as_row_set(ser_fmt)?) + // Convert struct timestamp columns to string representation + let records = convert_struct_to_timestamp(&records)?; + let serialized_rowset = records_to_json_string(&records)?; + let raw_value = RawValue::from_string(serialized_rowset) + .context(api_snowflake_rest_error::SerdeJsonSnafu)?; + Option::from(raw_value) } else { None }, row_set_base_64: if ser_fmt == DataSerializationFormat::Arrow { - let records = &convert_record_batches(&query_result, ser_fmt)?; - Option::from(records_to_arrow_string(records)?) + Option::from(records_to_arrow_string(&records)?) } else { None }, diff --git a/crates/executor/src/lib.rs b/crates/executor/src/lib.rs index e50cf14..624bb10 100644 --- a/crates/executor/src/lib.rs +++ b/crates/executor/src/lib.rs @@ -6,7 +6,6 @@ pub mod error_code; pub mod models; pub mod query; pub mod query_types; -pub mod result_set; pub mod running_queries; pub mod service; pub mod session; @@ -16,7 +15,6 @@ pub mod utils; pub use error::{Error, Result}; pub use query_types::{QueryRecordId, QueryStatus}; -pub use result_set::{Column, ResultSet, Row}; pub use running_queries::RunningQueryId; pub use snowflake_error::SnowflakeError; diff --git a/crates/executor/src/models.rs b/crates/executor/src/models.rs index 9b52929..e8fd7d3 100644 --- a/crates/executor/src/models.rs +++ b/crates/executor/src/models.rs @@ -1,21 +1,10 @@ use crate::query_types::{QueryRecordId, QueryStatus}; -use crate::result_set::{Column, ResultSet, Row}; -use crate::utils::{DataSerializationFormat, convert_record_batches, convert_struct_to_timestamp}; -use crate::{Result, error as ex_error}; -use arrow_schema::SchemaRef; -use datafusion::arrow; use datafusion::arrow::array::RecordBatch; use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit}; -use datafusion::arrow::json::StructMode; -use datafusion::arrow::json::WriterBuilder; -use datafusion::arrow::json::reader::ReaderBuilder; -use datafusion::arrow::json::writer::JsonArray; use datafusion_common::arrow::datatypes::Schema; use functions::to_snowflake_datatype; use serde::{Deserialize, Serialize}; -use snafu::ResultExt; use std::collections::HashMap; -use std::io::Cursor; use std::sync::Arc; use tokio::sync::oneshot; use uuid::Uuid; @@ -33,9 +22,6 @@ pub struct QueryContext { pub query_id: QueryRecordId, pub request_id: Option, pub ip_address: Option, - // async_query flag is not used - // TODO: remove or use it - pub async_query: bool, } impl QueryContext { @@ -52,7 +38,6 @@ impl QueryContext { query_id: QueryRecordId::default(), request_id: None, ip_address: None, - async_query: false, } } @@ -73,12 +58,6 @@ impl QueryContext { self.ip_address = Some(ip_address); self } - - #[must_use] - pub const fn with_async_query(mut self, async_query: bool) -> Self { - self.async_query = async_query; - self - } } #[derive(Debug, Clone, PartialEq)] @@ -87,134 +66,12 @@ pub struct QueryResult { /// The schema associated with the result. /// This is required to construct a valid response even when `records` are empty pub schema: Arc, - pub query_id: QueryRecordId, -} - -impl QueryResult { - pub fn as_row_set(&self, data_format: DataSerializationFormat) -> Result> { - // Do conversions every time, as currently all the history records had conversions - // for arrow format though were saved as json - - // Convert the QueryResult to RecordBatches using the specified serialization format - // Add columns dbt metadata to each field - // Since we have to store already converted data to history - let record_batches = convert_record_batches(self, data_format)?; - // Convert struct timestamp columns to string representation - let record_batches = &convert_struct_to_timestamp(&record_batches)?; - - let record_batches = record_batches.iter().collect::>(); - - // Serialize the RecordBatches into a JSON string using Arrow's Writer - let buffer = Vec::new(); - let mut writer = WriterBuilder::new() - .with_explicit_nulls(true) - .build::<_, JsonArray>(buffer); - - writer - .write_batches(&record_batches) - .context(ex_error::ArrowSnafu)?; - writer.finish().context(ex_error::ArrowSnafu)?; - - let json_bytes = writer.into_inner(); - let json_str = String::from_utf8(json_bytes).context(ex_error::Utf8Snafu)?; - - // Deserialize the JSON string into rows of values - let rows = - serde_json::from_str::>(&json_str).context(ex_error::SerdeParseSnafu)?; - - Ok(rows) - } - - pub fn as_result_set(&self, query_history_rows_limit: Option) -> Result { - // Extract column metadata from the original QueryResult - let columns = self - .column_info() - .iter() - .map(|ci| Column { - name: ci.name.clone(), - r#type: ci.r#type.clone(), - }) - .collect(); - - // Serialize original Schema into a JSON string - let schema = serde_json::to_string(&self.schema).context(ex_error::SerdeParseSnafu)?; - let data_format = DataSerializationFormat::Json; - Ok(ResultSet { - // just for refrence - id: self.query_id, - columns, - rows: self.as_row_set(data_format)?, - batch_size_bytes: self - .records - .iter() - .map(RecordBatch::get_array_memory_size) - .sum(), - // move here value of data_format we hardcoded earlier - data_format: data_format.to_string(), - schema, - configured_rows_limit: query_history_rows_limit, - }) - } -} - -fn convert_resultset_to_arrow_json_lines( - result_set: &ResultSet, -) -> std::result::Result { - let mut lines = String::new(); - for row in &result_set.rows { - let json_value = serde_json::Value::Array(row.0.clone()); - lines.push_str(&serde_json::to_string(&json_value)?); - lines.push('\n'); - } - Ok(lines) -} - -/// Convert historical query record to `QueryResult` -impl TryFrom for QueryResult { - type Error = crate::Error; - fn try_from(result_set: ResultSet) -> std::result::Result { - let arrow_json = convert_resultset_to_arrow_json_lines(&result_set) - .context(ex_error::SerdeParseSnafu)?; - - // Parse schema from serialized JSON - let schema_value = - serde_json::from_str(&result_set.schema).context(ex_error::SerdeParseSnafu)?; - - let schema_ref: SchemaRef = schema_value; - let json_reader = ReaderBuilder::new(schema_ref.clone()) - .with_struct_mode(StructMode::ListOnly) - .build(Cursor::new(&arrow_json)) - .context(ex_error::ArrowSnafu)?; - - let batches = json_reader - .collect::>>() - .context(ex_error::ArrowSnafu)?; - - Ok(Self { - records: batches, - schema: schema_ref, - query_id: result_set.id, - }) - } } impl QueryResult { #[must_use] - pub const fn new( - records: Vec, - schema: Arc, - query_id: QueryRecordId, - ) -> Self { - Self { - records, - schema, - query_id, - } - } - #[must_use] - pub const fn with_query_id(mut self, new_id: QueryRecordId) -> Self { - self.query_id = new_id; - self + pub const fn new(records: Vec, schema: Arc) -> Self { + Self { records, schema } } #[must_use] diff --git a/crates/executor/src/query.rs b/crates/executor/src/query.rs index 3d78a75..a45e3b7 100644 --- a/crates/executor/src/query.rs +++ b/crates/executor/src/query.rs @@ -2225,7 +2225,6 @@ impl UserQuery { async fn execute_sql(&self, query: &str) -> Result { let session = self.session.clone(); - let query_id = self.query_context.query_id; let query = query.to_string(); let stream = self .session @@ -2241,7 +2240,7 @@ impl UserQuery { if !records.is_empty() { schema = records[0].schema().as_ref().clone(); } - Ok::(QueryResult::new(records, Arc::new(schema), query_id)) + Ok::(QueryResult::new(records, Arc::new(schema))) }) .await .context(ex_error::JobSnafu)??; @@ -2250,7 +2249,6 @@ impl UserQuery { async fn execute_logical_plan(&self, plan: LogicalPlan) -> Result { let session = self.session.clone(); - let query_id = self.query_context.query_id; let span = tracing::debug_span!("UserQuery::execute_logical_plan"); @@ -2271,7 +2269,7 @@ impl UserQuery { if !records.is_empty() { schema = records[0].schema().as_ref().clone(); } - Ok::(QueryResult::new(records, Arc::new(schema), query_id)) + Ok::(QueryResult::new(records, Arc::new(schema))) }) .await .context(ex_error::JobSnafu)??; @@ -2284,7 +2282,6 @@ impl UserQuery { rules: Vec>, ) -> Result { let session = self.session.clone(); - let query_id = self.query_context.query_id; let stream = self .session .executor @@ -2311,7 +2308,7 @@ impl UserQuery { if !records.is_empty() { schema = records[0].schema().as_ref().clone(); } - Ok::(QueryResult::new(records, Arc::new(schema), query_id)) + Ok::(QueryResult::new(records, Arc::new(schema))) }) .await .context(ex_error::JobSnafu)??; @@ -2755,7 +2752,6 @@ impl UserQuery { .context(ex_error::ArrowSnafu)?, ], schema, - self.query_context.query_id, )) } @@ -2776,7 +2772,6 @@ impl UserQuery { .context(ex_error::ArrowSnafu)?, ], schema, - self.query_context.query_id, )) } diff --git a/crates/executor/src/result_set.rs b/crates/executor/src/result_set.rs deleted file mode 100644 index 8f7a305..0000000 --- a/crates/executor/src/result_set.rs +++ /dev/null @@ -1,150 +0,0 @@ -use bytes::Bytes; -use serde::de::{Deserializer, MapAccess, SeqAccess, Visitor}; -use serde::{Deserialize, Serialize}; -use serde_json::Value; -use snafu::ResultExt; -use std::fmt; - -use crate::error as ex_error; -use crate::query_types::QueryRecordId; - -pub const QUERY_HISTORY_HARD_LIMIT_BYTES: usize = 4 * 1024 * 1024 * 1024 - 512 * 1024 * 1024; - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct Column { - pub name: String, - pub r#type: String, -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] -pub struct Row(pub Vec); - -impl Row { - #[must_use] - pub const fn new(values: Vec) -> Self { - Self(values) - } -} - -impl<'de> Deserialize<'de> for Row { - fn deserialize(deserializer: D) -> std::result::Result - where - D: Deserializer<'de>, - { - struct RowVisitor; - - impl<'de> Visitor<'de> for RowVisitor { - type Value = Row; - - fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.write_str("A serialized JsonArray or JSON object is expected") - } - - fn visit_map(self, mut map: A) -> std::result::Result - where - A: MapAccess<'de>, - { - let mut values = Vec::new(); - while let Some((_, v)) = map.next_entry::()? { - values.push(v); - } - Ok(Row(values)) - } - - fn visit_seq(self, mut seq: A) -> std::result::Result - where - A: SeqAccess<'de>, - { - let mut values = Vec::new(); - while let Some(v) = seq.next_element::()? { - values.push(v); - } - Ok(Row(values)) - } - } - - deserializer.deserialize_any(RowVisitor) - } -} - -#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct ResultSet { - pub columns: Vec, - pub rows: Vec, - pub data_format: String, - pub schema: String, - #[serde(skip)] - pub id: QueryRecordId, - #[serde(skip)] - pub batch_size_bytes: usize, - #[serde(skip)] - pub configured_rows_limit: Option, -} - -impl ResultSet { - #[must_use] - #[allow(clippy::as_conversions)] // Safe: usize to i128 conversion is always safe as i128 can hold any usize value - pub const fn calc_hard_rows_limit(&self) -> Option { - if self.batch_size_bytes > QUERY_HISTORY_HARD_LIMIT_BYTES { - let batch_size_bytes: i128 = self.batch_size_bytes as i128; - let bytes_limit: i128 = QUERY_HISTORY_HARD_LIMIT_BYTES as i128; - let limit_exceeded_bytes = batch_size_bytes - bytes_limit; - let exceeded_in_percents = limit_exceeded_bytes / bytes_limit * 100; - let shrink_on_count = if exceeded_in_percents > 50 { - self.rows.len() * 90 / 100 - } else { - self.rows.len() * 50 / 100 - }; - let hard_rows_limit: usize = if self.rows.len() > shrink_on_count { - self.rows.len() - shrink_on_count - } else { - self.rows.len() - }; - Some(hard_rows_limit) - } else { - None - } - } - - pub fn serialize_with_limit(&self) -> (std::result::Result, usize) { - let max_rows_limit = self.configured_rows_limit.unwrap_or(usize::MAX); - let hard_rows_limit = self.calc_hard_rows_limit(); - let rows_limit = max_rows_limit.min(hard_rows_limit.unwrap_or(max_rows_limit)); - let serialize_rows_count = rows_limit.min(self.rows.len()); - ( - self.serialize_with_soft_limit(serialize_rows_count), - serialize_rows_count, - ) - } - - pub fn serialize_with_soft_limit( - &self, - n_rows: usize, - ) -> std::result::Result { - let result_set_with_limit = LimitedResultSet { - columns: &self.columns, - rows: &self.rows[..self.rows.len().min(n_rows)], - data_format: &self.data_format, - schema: &self.schema, - }; - serde_json::to_string(&result_set_with_limit) - } -} - -#[derive(Debug, Clone, Serialize)] -struct LimitedResultSet<'a> { - pub columns: &'a [Column], - pub rows: &'a [Row], - pub data_format: &'a str, - pub schema: &'a str, -} - -impl TryFrom for ResultSet { - type Error = ex_error::Error; - fn try_from(value: Bytes) -> Result { - let result_str = String::from_utf8(value.to_vec()).context(ex_error::Utf8Snafu)?; - let result_set: Self = - serde_json::from_str(&result_str).context(ex_error::SerdeParseSnafu)?; - Ok(result_set) - } -} diff --git a/crates/executor/src/utils.rs b/crates/executor/src/utils.rs index 20acbf9..c25e952 100644 --- a/crates/executor/src/utils.rs +++ b/crates/executor/src/utils.rs @@ -864,7 +864,6 @@ impl std::fmt::Display for NormalizedIdent { mod tests { use super::*; use crate::models::ColumnInfo; - use crate::query_types::QueryRecordId; use datafusion::arrow::array::{ ArrayRef, BooleanArray, Float64Array, Int32Array, TimestampSecondArray, UInt64Array, UnionArray, @@ -1120,7 +1119,7 @@ mod tests { ], ) .unwrap(); - let result = QueryResult::new(vec![batch], schema, QueryRecordId(0)); + let result = QueryResult::new(vec![batch], schema); let column_infos = result.column_info(); // === JSON conversion === @@ -1303,7 +1302,7 @@ mod tests { ) .unwrap(), ]; - let query_result = QueryResult::new(record_batches.clone(), schema, QueryRecordId(0)); + let query_result = QueryResult::new(record_batches.clone(), schema); let column_infos = query_result.column_info(); let converted_batches = convert_record_batches(&query_result, DataSerializationFormat::Arrow).unwrap(); @@ -1328,7 +1327,7 @@ mod tests { let record_batch = RecordBatch::try_new(schema.clone(), vec![date32_array, date64_array]).unwrap(); - let query_result = QueryResult::new(vec![record_batch], schema, QueryRecordId(0)); + let query_result = QueryResult::new(vec![record_batch], schema); let converted_batches = convert_record_batches(&query_result, DataSerializationFormat::Json).unwrap(); let converted_batch = &converted_batches[0]; @@ -1356,56 +1355,4 @@ mod tests { } } } - - #[test] - fn test_bug_1662_duplicate_columns_names() { - // Check if following result is converted to ResultSet correctly: - // +-----+-----+ - // | COL | COL | - // +-----+-----+ - // | 1 | 2 | - // +-----+-----+ - let schema = Arc::new(Schema::new(vec![ - Field::new("col", DataType::Int64, true), - Field::new("col", DataType::Int64, true), - ])); - - let record_batch = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(Int64Array::from(vec![Some(1)])), - Arc::new(Int64Array::from(vec![Some(2)])), - ], - ) - .unwrap(); - - // Create QueryResult - let query_result = QueryResult::new(vec![record_batch], schema, QueryRecordId(0)); - - // Create ResultSet from QueryResult - let result_set = query_result - .as_result_set(Some(1000)) - .expect("Failed to convert query result to result set"); - - eprintln!("Result set: {result_set:?}"); - // check if ResultSet is correct - assert_eq!(result_set.columns.len(), 2); - let columns_names = result_set - .columns - .iter() - .map(|col| col.name.clone()) - .collect::>(); - assert_eq!(columns_names, ["col", "col"]); - assert_eq!(result_set.rows.len(), 1); - let rows = result_set - .rows - .iter() - .map(|row| row.0.clone()) - .collect::>(); - let row = rows[0] - .iter() - .map(|col| col.as_i64().unwrap()) - .collect::>(); - assert_eq!(row, [1, 2]); - } }