Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .jules/bolt.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,7 @@
## 2026-04-08 - [Performance: Defer Allocation during Traversal]
**Learning:** During DAG traversals, creating owned variants of identifiers (like `file.to_path_buf()`) *before* checking `visited` HashSets results in heap allocations (O(E)) for every edge instead of every visited node (O(V)). By moving the `&PathBuf` allocation strictly *after* all HashSet `contains` checks using the borrowed reference (`&Path`), we drastically reduce memory churn.
**Action:** Always check `HashSet::contains` with a borrowed reference *before* creating the owned version required by `HashSet::insert`, especially in performance-critical graph traversal paths.

## 2026-04-09 - [Performance: Pre-allocate and use write! for dynamic SQL]
**Learning:** Constructing dynamic SQL queries in hot loops (like D1 target `upsert` and `delete` batching) using `format!` and `Vec::join` creates excessive intermediate string allocations and memory copies.
**Action:** Use `String::with_capacity` pre-calculated with a conservative estimate, combined with the `write!` macro, to build dynamic SQL statements efficiently without intermediate vectors and heap allocations.
73 changes: 45 additions & 28 deletions crates/flow/src/targets/d1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,40 +300,55 @@ impl D1ExportContext {
key: &KeyValue,
values: &FieldValues,
) -> Result<(String, Vec<serde_json::Value>), RecocoError> {
let mut columns = vec![];
let mut placeholders = vec![];
let mut params = vec![];
let mut update_clauses = vec![];
use std::fmt::Write;
let mut params =
Vec::with_capacity(self.key_fields_schema.len() + self.value_fields_schema.len());
// ⚡ Bolt Optimization: Use String::with_capacity and write! to avoid intermediate Vec<String> and format! allocations
let mut sql = String::with_capacity(128 + params.capacity() * 32);

write!(sql, "INSERT INTO {} (", self.table_name).unwrap();
let mut first = true;

// Extract key parts - KeyValue is a wrapper around Box<[KeyPart]>
for (idx, _key_field) in self.key_fields_schema.iter().enumerate() {
if let Some(key_part) = key.0.get(idx) {
columns.push(self.key_fields_schema[idx].name.clone());
placeholders.push("?".to_string());
if !first {
sql.push_str(", ");
}
sql.push_str(&self.key_fields_schema[idx].name);
params.push(key_part_to_json(key_part)?);
Comment on lines 313 to 319
first = false;
}
Comment on lines 312 to 321
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These builders currently tolerate missing key parts by skipping absent entries (key.0.get(idx)), which can lead to partial key predicates (or even none) and therefore wrong-row updates/inserts or overly-broad deletes for composite keys. Since key/value arrays are positional, consider validating that key.0.len() matches key_fields_schema.len() (and similarly for values) and returning a RecocoError instead of silently generating a different statement.

}

// Add value fields
for (idx, value) in values.fields.iter().enumerate() {
if let Some(value_field) = self.value_fields_schema.get(idx) {
columns.push(value_field.name.clone());
placeholders.push("?".to_string());
if !first {
sql.push_str(", ");
}
sql.push_str(&value_field.name);
params.push(value_to_json(value)?);
update_clauses.push(format!(
"{} = excluded.{}",
value_field.name, value_field.name
));
first = false;
}
}

let sql = format!(
"INSERT INTO {} ({}) VALUES ({}) ON CONFLICT DO UPDATE SET {}",
self.table_name,
columns.join(", "),
placeholders.join(", "),
update_clauses.join(", ")
);
sql.push_str(") VALUES (");
for i in 0..params.len() {
sql.push_str(if i > 0 { ", ?" } else { "?" });
}

sql.push_str(") ON CONFLICT DO UPDATE SET ");
first = true;
for (idx, _value) in values.fields.iter().enumerate() {
if let Some(value_field) = self.value_fields_schema.get(idx) {
if !first {
sql.push_str(", ");
}
write!(sql, "{0} = excluded.{0}", value_field.name).unwrap();
first = false;
}
Comment on lines +341 to +350
}

Ok((sql, params))
}
Expand All @@ -342,22 +357,24 @@ impl D1ExportContext {
&self,
key: &KeyValue,
) -> Result<(String, Vec<serde_json::Value>), RecocoError> {
let mut where_clauses = vec![];
let mut params = vec![];
use std::fmt::Write;
let mut params = Vec::with_capacity(self.key_fields_schema.len());
// ⚡ Bolt Optimization: Use String::with_capacity and write! to avoid intermediate Vec<String> and format! allocations
let mut sql = String::with_capacity(32 + self.table_name.len() + params.capacity() * 32);
write!(sql, "DELETE FROM {} WHERE ", self.table_name).unwrap();

let mut first = true;
for (idx, _key_field) in self.key_fields_schema.iter().enumerate() {
if let Some(key_part) = key.0.get(idx) {
where_clauses.push(format!("{} = ?", self.key_fields_schema[idx].name));
if !first {
sql.push_str(" AND ");
}
write!(sql, "{} = ?", self.key_fields_schema[idx].name).unwrap();
params.push(key_part_to_json(key_part)?);
first = false;
}
}
Comment on lines +363 to 376
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

build_delete_stmt starts the SQL with ... WHERE even if no key parts are appended (e.g., empty key schema or a shorter-than-expected KeyValue). That yields a syntactically invalid statement and obscures the real problem. Consider early-returning an error when zero predicates are emitted (and/or when the key length doesn't match the schema) before returning the SQL.


let sql = format!(
"DELETE FROM {} WHERE {}",
self.table_name,
where_clauses.join(" AND ")
);

Ok((sql, params))
}

Expand Down
Loading