Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 63 additions & 38 deletions backend/windmill-worker/src/worker_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::{
};

use anyhow::Context;
use backon::{BackoffBuilder, ConstantBuilder, Retryable};
use futures::TryFutureExt;
use mappable_rc::Marc;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -68,6 +69,26 @@ use windmill_queue::{

type DB = sqlx::Pool<sqlx::Postgres>;

/// Retry sqlx operations with exponential backoff to handle pool timeouts
macro_rules! retry_sqlx {
($operation:expr) => {
(|| async { $operation })
.retry(
ConstantBuilder::default()
.with_delay(std::time::Duration::from_millis(200))
.with_max_times(3)
.build(),
)
.notify(|err, dur| {
tracing::warn!(
"Database operation failed, retrying in {dur:#?}, err: {err:#?}"
);
})
.sleep(tokio::time::sleep)
.await
};
}

use windmill_audit::audit_oss::{audit_log, AuditAuthor};
use windmill_audit::ActionKind;
use windmill_queue::{canceled_job_to_result, push};
Expand Down Expand Up @@ -257,35 +278,37 @@ pub async fn update_flow_status_after_job_completion_internal(
) = {
// tracing::debug!("UPDATE FLOW STATUS: {flow:?} {success} {result:?} {w_id} {depth}");

let (job_kind, script_hash, old_status, raw_flow) = sqlx::query!(
"SELECT
kind AS \"job_kind!: JobKind\",
runnable_id AS \"script_hash: ScriptHash\",
flow_status AS \"flow_status!: Json<Box<RawValue>>\",
raw_flow AS \"raw_flow: Json<Box<RawValue>>\"
FROM v2_job INNER JOIN v2_job_status ON v2_job.id = v2_job_status.id WHERE v2_job.id = $1 AND v2_job.workspace_id = $2 LIMIT 1",
flow,
w_id
)
.fetch_one(db)
.await
.map_err(|e| {
Error::internal_err(format!(
"fetching flow status {flow} while reporting {success} {result:?}: {e:#}"
))
})
.and_then(|record| {
Ok((
record.job_kind,
record.script_hash,
serde_json::from_str::<FlowStatus>(record.flow_status.0.get()).map_err(|e| {
Error::internal_err(format!(
"requiring current module to be parsable as FlowStatus: {e:?}"
))
})?,
record.raw_flow,
))
})?;
let (job_kind, script_hash, old_status, raw_flow) = retry_sqlx!({
sqlx::query!(
"SELECT
kind AS \"job_kind!: JobKind\",
runnable_id AS \"script_hash: ScriptHash\",
flow_status AS \"flow_status!: Json<Box<RawValue>>\",
raw_flow AS \"raw_flow: Json<Box<RawValue>>\"
FROM v2_job INNER JOIN v2_job_status ON v2_job.id = v2_job_status.id WHERE v2_job.id = $1 AND v2_job.workspace_id = $2 LIMIT 1",
flow,
w_id
)
.fetch_one(db)
.await
.map_err(|e| {
Error::internal_err(format!(
"fetching flow status {flow} while reporting {success} {result:?}: {e:#}"
))
})
.and_then(|record| {
Ok((
record.job_kind,
record.script_hash,
serde_json::from_str::<FlowStatus>(record.flow_status.0.get()).map_err(|e| {
Error::internal_err(format!(
"requiring current module to be parsable as FlowStatus: {e:?}"
))
})?,
record.raw_flow,
))
})
})?;

let flow_data = cache::job::fetch_flow(db, &job_kind, script_hash)
.or_else(|_| cache::job::fetch_preview_flow(db, &flow, raw_flow))
Expand Down Expand Up @@ -1442,15 +1465,17 @@ async fn retrieve_flow_jobs_results(
w_id: &str,
job_uuids: &Vec<Uuid>,
) -> error::Result<Box<RawValue>> {
let results = sqlx::query!(
"SELECT result, id
FROM v2_job_completed
WHERE id = ANY($1) AND workspace_id = $2",
job_uuids.as_slice(),
w_id
)
.fetch_all(db)
.await?
let results = retry_sqlx!({
sqlx::query!(
"SELECT result, id
FROM v2_job_completed
WHERE id = ANY($1) AND workspace_id = $2",
job_uuids.as_slice(),
w_id
)
.fetch_all(db)
.await
})?
.into_iter()
.map(|br| (br.id, br.result))
.collect::<HashMap<_, _>>();
Expand Down
Loading