Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
8b7e043
Return array of results duckdb
diegoimbert Nov 4, 2025
9edadf7
Merge remote-tracking branch 'origin/main' into di/duckdb-return-arra…
diegoimbert Nov 5, 2025
7661799
Migration script to add result_collection=legacy
diegoimbert Nov 5, 2025
b1eab55
migration script fixes
diegoimbert Nov 5, 2025
54a08c0
app_version_lite not necessary
diegoimbert Nov 5, 2025
7484519
New annotations macro that supports custom types
diegoimbert Nov 5, 2025
910e406
pass unit tests
diegoimbert Nov 5, 2025
6cb2d70
nit style
diegoimbert Nov 5, 2025
99ec932
Merge branch 'main' into di/duckdb-return-array-of-results
diegoimbert Nov 6, 2025
e6f5cc0
result_collection almost works for postgres
diegoimbert Nov 6, 2025
b486cd4
fix last_statement
diegoimbert Nov 6, 2025
948c0ae
frontend suggesitons
diegoimbert Nov 6, 2025
41ade5a
Merge remote-tracking branch 'origin/main' into di/duckdb-return-arra…
diegoimbert Nov 6, 2025
85d37c5
fix column_order making columns disappear
diegoimbert Nov 6, 2025
474d9ac
added version check for duckdb FFI lib to avoid crashes when changing…
diegoimbert Nov 7, 2025
3e4a631
result_collection for duckdb
diegoimbert Nov 7, 2025
246ed64
Correct legacy behavior in DuckDB
diegoimbert Nov 7, 2025
0732197
mysql result_collection
diegoimbert Nov 7, 2025
deee662
mssql collection_strategy
diegoimbert Nov 7, 2025
a454903
result_collection for oracle
diegoimbert Nov 7, 2025
460fb18
snowflake result_collection
diegoimbert Nov 7, 2025
d73c43b
fix errors
diegoimbert Nov 7, 2025
943ba9d
Merge branch 'main' into di/duckdb-return-array-of-results
diegoimbert Nov 7, 2025
ae188f5
mistake, .clone() causing deadlock
diegoimbert Nov 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- Add down migration script here
89 changes: 89 additions & 0 deletions backend/migrations/20251105100125_legacy_sql_result_flag.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
CREATE OR REPLACE FUNCTION update_string(s text)
RETURNS text
LANGUAGE plpgsql
AS $$
DECLARE
prefix TEXT := '-- https://www.windmill.dev/docs/getting_started/scripts_quickstart/sql#result-collection
-- result_collection=legacy

';
BEGIN
RETURN prefix || s;
END;
$$;

CREATE OR REPLACE FUNCTION update_all_modules(obj jsonb)
RETURNS jsonb
LANGUAGE plpgsql
AS $$
DECLARE
result jsonb;
k text;
v jsonb;
BEGIN
IF jsonb_typeof(obj) = 'object' THEN
result := '{}'::jsonb;

FOR k, v IN SELECT * FROM jsonb_each(obj)
LOOP
IF k = 'content' and jsonb_typeof(v) = 'string' AND obj->>'language' IN ('bigquery', 'postgresql', 'duckdb', 'mssql', 'oracledb', 'snowflake', 'mysql') THEN
result := result || jsonb_build_object('content', update_string(obj->>'content'));
ELSE
result := result || jsonb_build_object(k, update_all_modules(v));
END IF;
END LOOP;

RETURN result;
ELSIF jsonb_typeof(obj) = 'array' AND jsonb_array_length(obj) > 0 THEN
SELECT jsonb_agg(update_all_modules(elem))
INTO result
FROM jsonb_array_elements(obj) elem;

RETURN result;
ELSE
RETURN obj;
END IF;
END;
$$;

-- Run on a flow_version_lite jsonb value. Returns an array of flow_node ids whose languages are SQL.
CREATE OR REPLACE FUNCTION find_sql_flow_nodes_ids(obj jsonb)
RETURNS BIGINT[]
LANGUAGE plpgsql
AS $$
DECLARE
result BIGINT[] := '{}';
k text;
v jsonb;
BEGIN
IF jsonb_typeof(obj) = 'object' THEN
IF obj->>'language' IN ('bigquery', 'postgresql', 'duckdb', 'mssql', 'oracledb', 'snowflake', 'mysql') AND jsonb_typeof(obj->'id') = 'number' THEN
result := result || (obj->>'id')::BIGINT;
END IF;

FOR k, v IN SELECT * FROM jsonb_each(obj)
LOOP
result := result || find_sql_flow_nodes_ids(v);
END LOOP;
ELSIF jsonb_typeof(obj) = 'array' AND jsonb_array_length(obj) > 0 THEN
SELECT array_agg(result_ids)
INTO result
FROM jsonb_array_elements(obj) elem, unnest(find_sql_flow_nodes_ids(elem)) as result_ids;
END IF;
RETURN result;
END;
$$;


UPDATE app_version SET value = update_all_modules(value::jsonb)::json;
UPDATE draft SET value = update_all_modules(value::jsonb)::json;
UPDATE flow SET value = update_all_modules(value);
UPDATE flow_version SET value = update_all_modules(value);
UPDATE flow_node SET code = update_string(code) WHERE id IN (
SELECT v FROM flow_version_lite, unnest(find_sql_flow_nodes_ids(value)) as v
);
UPDATE script SET content = update_string(content) WHERE language IN ('bigquery', 'postgresql', 'duckdb', 'mssql', 'oracledb', 'snowflake', 'mysql');

DROP FUNCTION IF EXISTS update_all_modules(jsonb);
DROP FUNCTION IF EXISTS update_string(text);
DROP FUNCTION IF EXISTS find_sql_flow_nodes_ids(jsonb);
166 changes: 162 additions & 4 deletions backend/windmill-common/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -663,13 +663,11 @@ fn parse_file<T: FromStr>(path: &str) -> Option<T> {
.flatten()
}

#[derive(Copy, Clone)]
#[annotations("#")]
pub struct RubyAnnotations {
pub verbose: bool,
}

#[derive(Copy, Clone)]
#[annotations("#")]
pub struct PythonAnnotations {
pub no_cache: bool,
Expand All @@ -682,7 +680,6 @@ pub struct PythonAnnotations {
pub py313: bool,
}

#[derive(Copy, Clone)]
#[annotations("//")]
pub struct GoAnnotations {
pub go1_22_compat: bool,
Expand All @@ -698,13 +695,174 @@ pub struct TypeScriptAnnotations {

#[annotations("--")]
pub struct SqlAnnotations {
pub return_last_result: bool,
pub return_last_result: bool, // deprecated, use result_collection instead
pub result_collection: SqlResultCollectionStrategy,
}

#[annotations("#")]
pub struct BashAnnotations {
pub docker: bool,
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub enum SqlResultCollectionStrategy {
LastStatementAllRows,
LastStatementFirstRow,
LastStatementAllRowsScalar,
LastStatementFirstRowScalar,
AllStatementsAllRows,
AllStatementsFirstRow,
AllStatementsAllRowsScalar,
AllStatementsFirstRowScalar,
Legacy,
}

impl SqlResultCollectionStrategy {
pub fn parse(s: &str) -> Self {
use SqlResultCollectionStrategy::*;
match s {
"last_statement_all_rows" => LastStatementAllRows,
"last_statement_first_row" => LastStatementFirstRow,
"last_statement_all_rows_scalar" => LastStatementAllRowsScalar,
"last_statement_first_row_scalar" => LastStatementFirstRowScalar,
"all_statements_all_rows" => AllStatementsAllRows,
"all_statements_first_row" => AllStatementsFirstRow,
"all_statements_all_rows_scalar" => AllStatementsAllRowsScalar,
"all_statements_first_row_scalar" => AllStatementsFirstRowScalar,
"legacy" => Legacy,
_ => SqlResultCollectionStrategy::default(),
}
}

pub fn collect_last_statement_only(&self, query_count: usize) -> bool {
use SqlResultCollectionStrategy::*;
match self {
LastStatementAllRows
| LastStatementFirstRow
| LastStatementFirstRowScalar
| LastStatementAllRowsScalar => true,
Legacy => query_count == 1,
_ => false,
}
}
pub fn collect_first_row_only(&self) -> bool {
use SqlResultCollectionStrategy::*;
match self {
LastStatementFirstRow
| LastStatementFirstRowScalar
| AllStatementsFirstRow
| AllStatementsFirstRowScalar => true,
_ => false,
}
}
pub fn collect_scalar(&self) -> bool {
use SqlResultCollectionStrategy::*;
match self {
LastStatementFirstRowScalar
| AllStatementsFirstRowScalar
| LastStatementAllRowsScalar
| AllStatementsAllRowsScalar => true,
_ => false,
}
}

// This function transforms the shape (e.g Row[][] -> Row)
// It is the responsibility of the executor to avoid fetching unnecessary statements/rows
pub fn collect(
&self,
values: Vec<Vec<Box<serde_json::value::RawValue>>>,
) -> error::Result<Box<serde_json::value::RawValue>> {
let null = || serde_json::value::RawValue::from_string("null".to_string()).unwrap();

let values = if self.collect_last_statement_only(values.len()) {
values.into_iter().rev().take(1).collect()
} else {
values
};

let values = if self.collect_first_row_only() {
values
.into_iter()
.map(|rows| rows.into_iter().take(1).collect())
.collect()
} else {
values
};

let values = if self.collect_scalar() {
values
.into_iter()
.map(|rows| {
rows.into_iter()
.map(|row| {
// Take the first value in the object
let record =
match serde_json::from_str(row.get()) {
Ok(serde_json::Value::Object(record)) => record,
Ok(_) => return Err(error::Error::ExecutionErr(
"Could not collect sql scalar value from non-object row"
.to_string(),
)),
Err(e) => {
return Err(error::Error::ExecutionErr(format!(
"Could not collect sql scalar value (failed to parse row): {}",
e
)))
}
};
let Some((_, value)) = record.iter().next() else {
return Err(error::Error::ExecutionErr(
"Could not collect sql scalar value from empty row".to_string(),
));
};
Ok(serde_json::value::RawValue::from_string(
serde_json::to_string(value).map_err(to_anyhow)?,
)
.map_err(to_anyhow)?)
})
.collect::<error::Result<Vec<_>>>()
})
.collect::<error::Result<Vec<_>>>()?
} else {
values
};

match (
self.collect_last_statement_only(values.len()),
self.collect_first_row_only(),
) {
(true, true) => {
match values
.into_iter()
.last()
.map(|rows| rows.into_iter().next())
{
Some(Some(row)) => Ok(row.clone()),
_ => Ok(null()),
}
}
(true, false) => match values.into_iter().last() {
Some(rows) => Ok(to_raw_value(&rows)),
None => Ok(null()),
},
(false, true) => {
let values = values
.into_iter()
.map(|rows| rows.into_iter().next().unwrap_or_else(null))
.collect::<Vec<_>>();
Ok(to_raw_value(&values))
}
(false, false) => Ok(to_raw_value(&values)),
}
}
}

impl Default for SqlResultCollectionStrategy {
fn default() -> Self {
SqlResultCollectionStrategy::LastStatementAllRows
}
}

/// length = 5
/// value = "foo"
/// output = "foo "
Expand Down
2 changes: 1 addition & 1 deletion backend/windmill-duckdb-ffi-internal/README_DEV.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ INSERT INTO t VALUES (NULL);

causes `Constraint Error: NOT NULL constraint failed: t.x` normally, but here we see `Unknown exception in ExecutorTask::Execute`. This opaque errors comes directly from the C++ DuckDB library : https://github.com/duckdb/duckdb/blob/f99fed1e0b16a842573f9dad529f6c170a004f6e/src/parallel/executor_task.cpp#L58

To solve this, we compile duckdb separately from the main backend crate and call it with FFI
To solve this, we compile duckdb separately from the main backend crate and call it with FFI. It has to be loaded dynamically, if it is loaded statically it will still share lib c++ with deno_core and cause issues.
Loading