Skip to content

Commit 49524d5

Browse files
chore(aiagent): store memory in db (#7053)
* store to db * no warning in context * warning in frontend * remove log * 100kb + alert * update sqlx * update eeref * Update ee-repo-ref.txt * cleaning --------- Co-authored-by: windmill-internal-app[bot] <windmill-internal-app[bot]@users.noreply.github.com>
1 parent ad43680 commit 49524d5

17 files changed

+239
-65
lines changed

backend/.sqlx/query-38baa6cf7d1c2532d38486a01e6d27ac1a58540a51e97dec5613e9bc0791e890.json

Lines changed: 17 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/.sqlx/query-6a1005b8ae5326c5d5955534e9228b6bd524253971ae09cbe100c0d6d42e6cdc.json

Lines changed: 24 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/.sqlx/query-d1d9cb0a7043760564171f481dd73f04eaa2c7d5c1ddc0501bc70c3fb34a07be.json

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/ee-repo-ref.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
20f881d0c328cdb7ce888db8faf8989a4d8e5bad
1+
5b7afe50da442441747e7a8f6ef461c96faa9dc2
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
-- Add down migration script here
2+
3+
-- Drop table
4+
DROP TABLE IF EXISTS ai_agent_memory;
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
-- Add up migration script here
2+
3+
-- Create ai_agent_memory table for storing AI agent step memory when S3 is unavailable
4+
CREATE TABLE ai_agent_memory (
5+
workspace_id VARCHAR(50) NOT NULL,
6+
conversation_id UUID NOT NULL,
7+
step_id VARCHAR(255) NOT NULL,
8+
messages JSONB NOT NULL,
9+
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
10+
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
11+
PRIMARY KEY (workspace_id, conversation_id, step_id)
12+
);
13+
14+
-- Grant permissions
15+
GRANT ALL ON ai_agent_memory TO windmill_admin;
16+
GRANT ALL ON ai_agent_memory TO windmill_user;

backend/windmill-api/src/flow_conversations.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use uuid::Uuid;
1111

1212
use crate::db::ApiAuthed;
1313
use windmill_common::{
14-
db::UserDB,
14+
db::{UserDB, DB},
1515
error::{JsonResult, Result},
1616
flow_conversations::MessageType,
1717
utils::{not_found_if_none, paginate, Pagination},
@@ -154,6 +154,7 @@ pub async fn get_or_create_conversation_with_id(
154154
async fn delete_conversation(
155155
authed: ApiAuthed,
156156
Extension(user_db): Extension<UserDB>,
157+
Extension(db): Extension<DB>,
157158
Path((w_id, conversation_id)): Path<(String, Uuid)>,
158159
) -> Result<String> {
159160
let mut tx = user_db.clone().begin(&authed).await?;
@@ -185,10 +186,14 @@ async fn delete_conversation(
185186

186187
// Delete associated memory in background (non-blocking cleanup)
187188
let w_id_clone = w_id.clone();
189+
let db_clone = db.clone();
188190
tokio::spawn(async move {
189-
if let Err(e) =
190-
windmill_worker::memory_oss::delete_conversation_memory(&w_id_clone, conversation_id)
191-
.await
191+
if let Err(e) = windmill_worker::memory_oss::delete_conversation_memory(
192+
&db_clone,
193+
&w_id_clone,
194+
conversation_id,
195+
)
196+
.await
192197
{
193198
tracing::error!(
194199
"Failed to delete memory for conversation {} in workspace {}: {:?}",

backend/windmill-common/src/worker.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -498,7 +498,6 @@ pub async fn store_pull_query(wc: &WorkerConfig) {
498498

499499
pub const TMP_DIR: &str = "/tmp/windmill";
500500
pub const TMP_LOGS_DIR: &str = concatcp!(TMP_DIR, "/logs");
501-
pub const TMP_MEMORY_DIR: &str = concatcp!(TMP_DIR, "/memory");
502501

503502
pub const HUB_CACHE_DIR: &str = concatcp!(ROOT_CACHE_DIR, "hub");
504503

backend/windmill-worker/src/ai_executor.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ pub async fn run_agent(
405405
.and_then(|fs| fs.memory_id)
406406
{
407407
// Read messages from memory
408-
match read_from_memory(&job.workspace_id, memory_id, step_id).await {
408+
match read_from_memory(db, &job.workspace_id, memory_id, step_id).await {
409409
Ok(Some(loaded_messages)) => {
410410
// Take the last n messages
411411
let start_idx = loaded_messages.len().saturating_sub(context_length);
@@ -856,6 +856,7 @@ pub async fn run_agent(
856856

857857
if let Some(memory_id) = flow_context.flow_status.and_then(|fs| fs.memory_id) {
858858
if let Err(e) = write_to_memory(
859+
db,
859860
&job.workspace_id,
860861
memory_id,
861862
step_id,
Lines changed: 97 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,72 +1,123 @@
11
use crate::ai::types::OpenAIMessage;
2-
use std::path::PathBuf;
3-
use tokio::{fs, io::AsyncWriteExt};
42
use uuid::Uuid;
5-
use windmill_common::worker::TMP_MEMORY_DIR;
6-
7-
/// Get the file path for storing memory for a specific AI agent step
8-
pub fn path_for(workspace_id: &str, conversation_id: Uuid, step_id: &str) -> PathBuf {
9-
PathBuf::from(TMP_MEMORY_DIR)
10-
.join(workspace_id)
11-
.join(conversation_id.to_string())
12-
.join(format!("{step_id}.json"))
13-
}
3+
use windmill_common::{db::DB, error::Error};
4+
5+
pub const MAX_MEMORY_SIZE_BYTES: usize = 100_000; // 100KB per memory entry in database
146

15-
/// Read messages from disk storage
16-
pub async fn read_from_disk(
7+
/// Read AI agent memory from database
8+
pub async fn read_from_db(
9+
db: &DB,
1710
workspace_id: &str,
1811
conversation_id: Uuid,
1912
step_id: &str,
20-
) -> anyhow::Result<Option<Vec<OpenAIMessage>>> {
21-
let path = path_for(workspace_id, conversation_id, step_id);
22-
if !fs::try_exists(&path).await? {
23-
return Ok(None);
24-
}
13+
) -> Result<Option<Vec<OpenAIMessage>>, Error> {
14+
let result = sqlx::query!(
15+
"SELECT messages FROM ai_agent_memory
16+
WHERE workspace_id = $1 AND conversation_id = $2 AND step_id = $3",
17+
workspace_id,
18+
conversation_id,
19+
step_id
20+
)
21+
.fetch_optional(db)
22+
.await?;
2523

26-
let bytes = fs::read(&path).await?;
27-
let messages: Vec<OpenAIMessage> = serde_json::from_slice(&bytes)?;
28-
Ok(Some(messages))
24+
match result {
25+
Some(row) => {
26+
let messages: Vec<OpenAIMessage> = serde_json::from_value(row.messages)?;
27+
Ok(Some(messages))
28+
}
29+
None => Ok(None),
30+
}
2931
}
3032

31-
/// Write messages to disk storage
32-
pub async fn write_to_disk(
33+
/// Write AI agent memory to database with size checking and truncation
34+
pub async fn write_to_db(
35+
db: &DB,
3336
workspace_id: &str,
3437
conversation_id: Uuid,
3538
step_id: &str,
3639
messages: &[OpenAIMessage],
37-
) -> anyhow::Result<()> {
38-
let path = path_for(workspace_id, conversation_id, step_id);
39-
40-
// Ensure parent directories exist
41-
if let Some(dir) = path.parent() {
42-
fs::create_dir_all(dir).await?;
40+
) -> Result<(), Error> {
41+
if messages.is_empty() {
42+
return Ok(());
4343
}
4444

45-
// Write atomically using a temporary file
46-
let tmp = path.with_extension("json.tmp");
47-
let mut f = fs::File::create(&tmp).await?;
48-
f.write_all(&serde_json::to_vec(messages)?).await?;
49-
f.flush().await?;
50-
drop(f);
45+
// Serialize messages and check size
46+
let mut messages_to_store = messages.to_vec();
47+
let mut json_value = serde_json::to_value(&messages_to_store)?;
48+
let size_bytes = json_value.to_string().len();
49+
50+
// Truncate if necessary
51+
if size_bytes > MAX_MEMORY_SIZE_BYTES {
52+
tracing::warn!(
53+
"Memory size ({} bytes) exceeds limit ({} bytes) for workspace={} conversation={} step={}. Truncating messages. Use S3 storage in workspace settings to store full conversation history.",
54+
size_bytes,
55+
MAX_MEMORY_SIZE_BYTES,
56+
workspace_id,
57+
conversation_id,
58+
step_id
59+
);
60+
61+
messages_to_store = truncate_messages(messages, MAX_MEMORY_SIZE_BYTES)?;
62+
json_value = serde_json::to_value(&messages_to_store)?;
63+
}
5164

52-
// Atomic rename
53-
fs::rename(tmp, &path).await?;
65+
// Insert or update using UPSERT
66+
sqlx::query!(
67+
"INSERT INTO ai_agent_memory (workspace_id, conversation_id, step_id, messages, created_at, updated_at)
68+
VALUES ($1, $2, $3, $4, NOW(), NOW())
69+
ON CONFLICT (workspace_id, conversation_id, step_id)
70+
DO UPDATE SET
71+
messages = EXCLUDED.messages,
72+
updated_at = NOW()",
73+
workspace_id,
74+
conversation_id,
75+
step_id,
76+
json_value
77+
)
78+
.execute(db)
79+
.await?;
5480

5581
Ok(())
5682
}
5783

58-
/// Delete all memory for a conversation from disk storage
59-
pub async fn delete_conversation_from_disk(
84+
/// Delete all memory for a conversation from database
85+
pub async fn delete_conversation_from_db(
86+
db: &DB,
6087
workspace_id: &str,
6188
conversation_id: Uuid,
62-
) -> anyhow::Result<()> {
63-
let conversation_path = PathBuf::from(TMP_MEMORY_DIR)
64-
.join(workspace_id)
65-
.join(conversation_id.to_string());
89+
) -> Result<(), Error> {
90+
sqlx::query!(
91+
"DELETE FROM ai_agent_memory
92+
WHERE workspace_id = $1 AND conversation_id = $2",
93+
workspace_id,
94+
conversation_id
95+
)
96+
.execute(db)
97+
.await?;
6698

67-
if fs::try_exists(&conversation_path).await? {
68-
fs::remove_dir_all(&conversation_path).await?;
99+
Ok(())
100+
}
101+
102+
/// Truncate messages to fit within the size limit
103+
fn truncate_messages(
104+
messages: &[OpenAIMessage],
105+
max_size_bytes: usize,
106+
) -> Result<Vec<OpenAIMessage>, Error> {
107+
let mut result = messages.to_vec();
108+
109+
// Keep removing oldest messages until we're under the threshold
110+
while !result.is_empty() {
111+
let test_json = serde_json::to_value(&result)?;
112+
let test_size = test_json.to_string().len();
113+
114+
if test_size <= max_size_bytes {
115+
break;
116+
}
117+
118+
// Remove the first (oldest) message
119+
result.remove(0);
69120
}
70121

71-
Ok(())
122+
Ok(result)
72123
}

0 commit comments

Comments
 (0)