diff --git a/database/schema.md b/database/schema.md index 8fb9d8657..6c8194d7d 100644 --- a/database/schema.md +++ b/database/schema.md @@ -258,3 +258,41 @@ aid benchmark error ---------- --- ----- 1 syn-1.0.89 Failed to compile... ``` + + +## New benchmarking design +We are currently implementing a new design for dispatching benchmarks to collector(s) and storing +them in the database. It will support new use-cases, like backfilling of new benchmarks into a parent +commit and primarily benchmarking with multiple collectors (and multiple hardware architectures) in +parallel. + +The tables below are a part of the new scheme. + +### benchmark_request + +Represents a single request for performing a benchmark collection. Each request can be one of three types: + +* Master: benchmark a merged master commit +* Release: benchmark a published stable or beta compiler toolchain +* Try: benchmark a try build on a PR + +Columns: + +* **id** (`int`): Unique ID. +* **tag** (`text`): Identifier of the compiler toolchain that should be benchmarked. + * Commit SHA for master/try requests or release name (e.g. `1.80.0`) for release requests. + * Can be `NULL` for try requests that were queued for a perf. run, but their compiler artifacts haven't been built yet. +* **parent_sha** (`text`): Parent SHA of the benchmarked commit. + * Can be `NULL` for try requests without compiler artifacts. +* **commit_type** (`text NOT NULL`): One of `master`, `try` or `release`. +* **pr** (`int`): Pull request number associated with the master/try commit. + * `NULL` for release requests. +* **created_at** (`timestamptz NOT NULL`): Datetime when the request was created. +* **completed_at** (`timestamptz`): Datetime when the request was completed. +* **status** (`text NOT NULL`): Current status of the benchmark request. + * `waiting-for-artifacts`: A try request waiting for compiler artifacts. + * `artifacts-ready`: Request that has compiler artifacts ready and can be benchmarked. + * `in-progress`: Request that is currently being benchmarked. + * `completed`: Completed request. +* **backends** (`text NOT NULL`): Comma-separated list of codegen backends to benchmark. If empty, the default set of codegen backends will be benchmarked. +* **profiles** (`text NOT NULL`): Comma-separated list of profiles to benchmark. If empty, the default set of profiles will be benchmarked. diff --git a/database/src/lib.rs b/database/src/lib.rs index 1fa9b2350..b171f47ee 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -1,6 +1,6 @@ use chrono::offset::TimeZone; use chrono::{DateTime, Utc}; -use hashbrown::HashMap; +use hashbrown::{HashMap, HashSet}; use intern::intern; use serde::{Deserialize, Serialize}; use std::fmt; @@ -309,6 +309,7 @@ impl Scenario { } } +use anyhow::anyhow; use std::cmp::Ordering; use std::str::FromStr; @@ -797,53 +798,64 @@ pub struct ArtifactCollection { pub end_time: DateTime, } -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Copy, Clone, PartialEq)] pub enum BenchmarkRequestStatus { WaitingForArtifacts, ArtifactsReady, InProgress, - Completed, + Completed { completed_at: DateTime }, } +const BENCHMARK_REQUEST_STATUS_WAITING_FOR_ARTIFACTS_STR: &str = "waiting_for_artifacts"; +const BENCHMARK_REQUEST_STATUS_ARTIFACTS_READY_STR: &str = "artifacts_ready"; +const BENCHMARK_REQUEST_STATUS_IN_PROGRESS_STR: &str = "in_progress"; +const BENCHMARK_REQUEST_STATUS_COMPLETED_STR: &str = "completed"; + impl BenchmarkRequestStatus { - pub fn as_str(&self) -> &str { + pub(crate) fn as_str(&self) -> &str { match self { - BenchmarkRequestStatus::WaitingForArtifacts => "waiting_for_artifacts", - BenchmarkRequestStatus::ArtifactsReady => "artifacts_ready", - BenchmarkRequestStatus::InProgress => "in_progress", - BenchmarkRequestStatus::Completed => "completed", + Self::WaitingForArtifacts => BENCHMARK_REQUEST_STATUS_WAITING_FOR_ARTIFACTS_STR, + Self::ArtifactsReady => BENCHMARK_REQUEST_STATUS_ARTIFACTS_READY_STR, + Self::InProgress => BENCHMARK_REQUEST_STATUS_IN_PROGRESS_STR, + Self::Completed { .. } => BENCHMARK_REQUEST_STATUS_COMPLETED_STR, } } -} -impl fmt::Display for BenchmarkRequestStatus { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str(self.as_str()) + pub(crate) fn from_str_and_completion_date( + text: &str, + completion_date: Option>, + ) -> anyhow::Result { + match text { + BENCHMARK_REQUEST_STATUS_WAITING_FOR_ARTIFACTS_STR => Ok(Self::WaitingForArtifacts), + BENCHMARK_REQUEST_STATUS_ARTIFACTS_READY_STR => Ok(Self::ArtifactsReady), + BENCHMARK_REQUEST_STATUS_IN_PROGRESS_STR => Ok(Self::InProgress), + BENCHMARK_REQUEST_STATUS_COMPLETED_STR => Ok(Self::Completed { + completed_at: completion_date.ok_or_else(|| { + anyhow!("No completion date for a completed BenchmarkRequestStatus") + })?, + }), + _ => Err(anyhow!("Unknown BenchmarkRequestStatus `{text}`")), + } } -} -impl<'a> tokio_postgres::types::FromSql<'a> for BenchmarkRequestStatus { - fn from_sql( - ty: &tokio_postgres::types::Type, - raw: &'a [u8], - ) -> Result> { - // Decode raw bytes into &str with Postgres' own text codec - let s: &str = <&str as tokio_postgres::types::FromSql>::from_sql(ty, raw)?; - - match s { - x if x == Self::WaitingForArtifacts.as_str() => Ok(Self::WaitingForArtifacts), - x if x == Self::ArtifactsReady.as_str() => Ok(Self::ArtifactsReady), - x if x == Self::InProgress.as_str() => Ok(Self::InProgress), - x if x == Self::Completed.as_str() => Ok(Self::Completed), - other => Err(format!("unknown benchmark_request_status '{other}'").into()), + pub(crate) fn completed_at(&self) -> Option> { + match self { + Self::Completed { completed_at } => Some(*completed_at), + _ => None, } } +} - fn accepts(ty: &tokio_postgres::types::Type) -> bool { - <&str as tokio_postgres::types::FromSql>::accepts(ty) +impl fmt::Display for BenchmarkRequestStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(self.as_str()) } } +const BENCHMARK_REQUEST_TRY_STR: &str = "try"; +const BENCHMARK_REQUEST_MASTER_STR: &str = "master"; +const BENCHMARK_REQUEST_RELEASE_STR: &str = "release"; + #[derive(Debug, Clone, PartialEq)] pub enum BenchmarkRequestType { /// A Try commit @@ -865,75 +877,58 @@ pub enum BenchmarkRequestType { impl fmt::Display for BenchmarkRequestType { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - BenchmarkRequestType::Try { .. } => write!(f, "try"), - BenchmarkRequestType::Master { .. } => write!(f, "master"), - BenchmarkRequestType::Release { .. } => write!(f, "release"), + BenchmarkRequestType::Try { .. } => write!(f, "{BENCHMARK_REQUEST_TRY_STR}"), + BenchmarkRequestType::Master { .. } => write!(f, "{BENCHMARK_REQUEST_MASTER_STR}"), + BenchmarkRequestType::Release { .. } => write!(f, "{BENCHMARK_REQUEST_RELEASE_STR}"), } } } #[derive(Debug, Clone, PartialEq)] pub struct BenchmarkRequest { - pub commit_type: BenchmarkRequestType, - pub created_at: DateTime, - pub completed_at: Option>, - pub status: BenchmarkRequestStatus, - pub backends: String, - pub profiles: String, + commit_type: BenchmarkRequestType, + created_at: DateTime, + status: BenchmarkRequestStatus, + backends: String, + profiles: String, } impl BenchmarkRequest { - pub fn create_release( - tag: &str, - created_at: DateTime, - status: BenchmarkRequestStatus, - backends: &str, - profiles: &str, - ) -> Self { + /// Create a release benchmark request that is in the `ArtifactsReady` status. + pub fn create_release(tag: &str, created_at: DateTime) -> Self { Self { commit_type: BenchmarkRequestType::Release { tag: tag.to_string(), }, created_at, - completed_at: None, - status, - backends: backends.to_string(), - profiles: profiles.to_string(), + status: BenchmarkRequestStatus::ArtifactsReady, + backends: String::new(), + profiles: String::new(), } } - pub fn create_try( - sha: Option<&str>, - parent_sha: Option<&str>, + /// Create a try request that is in the `WaitingForArtifacts` status. + pub fn create_try_without_artifacts( pr: u32, created_at: DateTime, - status: BenchmarkRequestStatus, backends: &str, profiles: &str, ) -> Self { Self { commit_type: BenchmarkRequestType::Try { pr, - sha: sha.map(|it| it.to_string()), - parent_sha: parent_sha.map(|it| it.to_string()), + sha: None, + parent_sha: None, }, created_at, - completed_at: None, - status, + status: BenchmarkRequestStatus::WaitingForArtifacts, backends: backends.to_string(), profiles: profiles.to_string(), } } - pub fn create_master( - sha: &str, - parent_sha: &str, - pr: u32, - created_at: DateTime, - status: BenchmarkRequestStatus, - backends: &str, - profiles: &str, - ) -> Self { + /// Create a master benchmark request that is in the `ArtifactsReady` status. + pub fn create_master(sha: &str, parent_sha: &str, pr: u32, created_at: DateTime) -> Self { Self { commit_type: BenchmarkRequestType::Master { pr, @@ -941,10 +936,9 @@ impl BenchmarkRequest { parent_sha: parent_sha.to_string(), }, created_at, - completed_at: None, - status, - backends: backends.to_string(), - profiles: profiles.to_string(), + status: BenchmarkRequestStatus::ArtifactsReady, + backends: String::new(), + profiles: String::new(), } } @@ -974,4 +968,45 @@ impl BenchmarkRequest { BenchmarkRequestType::Release { .. } => None, } } + + pub fn status(&self) -> BenchmarkRequestStatus { + self.status + } + + pub fn created_at(&self) -> DateTime { + self.created_at + } + + pub fn is_master(&self) -> bool { + matches!(self.commit_type, BenchmarkRequestType::Master { .. }) + } + + pub fn is_try(&self) -> bool { + matches!(self.commit_type, BenchmarkRequestType::Try { .. }) + } + + pub fn is_release(&self) -> bool { + matches!(self.commit_type, BenchmarkRequestType::Release { .. }) + } +} + +/// Cached information about benchmark requests in the DB +/// FIXME: only store non-try requests here +pub struct BenchmarkRequestIndex { + /// Tags (SHA or release name) of all known benchmark requests + all: HashSet, + /// Tags (SHA or release name) of all benchmark requests in the completed status + completed: HashSet, +} + +impl BenchmarkRequestIndex { + /// Do we already have a benchmark request for the passed `tag`? + pub fn contains_tag(&self, tag: &str) -> bool { + self.all.contains(tag) + } + + /// Return tags of already completed benchmark requests. + pub fn completed_requests(&self) -> &HashSet { + &self.completed + } } diff --git a/database/src/pool.rs b/database/src/pool.rs index 458f08af7..ac2fe31ac 100644 --- a/database/src/pool.rs +++ b/database/src/pool.rs @@ -1,6 +1,6 @@ use crate::{ - ArtifactCollection, ArtifactId, ArtifactIdNumber, BenchmarkRequest, BenchmarkRequestStatus, - CodegenBackend, CompileBenchmark, Target, + ArtifactCollection, ArtifactId, ArtifactIdNumber, BenchmarkRequest, BenchmarkRequestIndex, + BenchmarkRequestStatus, CodegenBackend, CompileBenchmark, Target, }; use crate::{CollectionId, Index, Profile, QueuedCommit, Scenario, Step}; use chrono::{DateTime, Utc}; @@ -187,18 +187,19 @@ pub trait Connection: Send + Sync { benchmark_request: &BenchmarkRequest, ) -> anyhow::Result<()>; - /// Gets the benchmark requests matching the status. Optionally provide the - /// number of days from whence to search from - async fn get_benchmark_requests_by_status( - &self, - statuses: &[BenchmarkRequestStatus], - ) -> anyhow::Result>; + /// Load all known benchmark request SHAs and all completed benchmark requests. + async fn load_benchmark_request_index(&self) -> anyhow::Result; + + /// Load all pending benchmark requests, i.e. those that have artifacts ready, but haven't + /// been completed yet. Pending statuses are `ArtifactsReady` and `InProgress`. + async fn load_pending_benchmark_requests(&self) -> anyhow::Result>; - /// Update the status of a `benchmark_request` + /// Update the status of a `benchmark_request` with the given `tag`. + /// If no such request exists in the DB, returns an error. async fn update_benchmark_request_status( - &mut self, - benchmark_request: &BenchmarkRequest, - benchmark_request_status: BenchmarkRequestStatus, + &self, + tag: &str, + status: BenchmarkRequestStatus, ) -> anyhow::Result<()>; /// Update a Try commit to have a `sha` and `parent_sha`. Will update the @@ -334,7 +335,7 @@ mod tests { use super::*; use crate::{ tests::{run_db_test, run_postgres_test}, - BenchmarkRequestStatus, Commit, CommitType, Date, + BenchmarkRequestStatus, BenchmarkRequestType, Commit, CommitType, Date, }; /// Create a Commit @@ -405,157 +406,93 @@ mod tests { .await; } + // Check that we can't have multiple requests with the same SHA #[tokio::test] - async fn insert_benchmark_requests() { + async fn multiple_requests_same_sha() { run_postgres_test(|ctx| async { let db = ctx.db_client(); - let time = chrono::DateTime::from_str("2021-09-01T00:00:00.000Z").unwrap(); - let master_benchmark_request = BenchmarkRequest::create_master( + let db = db.connection().await; + db.insert_benchmark_request(&BenchmarkRequest::create_master( "a-sha-1", "parent-sha-1", 42, - time, - BenchmarkRequestStatus::ArtifactsReady, - "llvm", - "", - ); - - let try_benchmark_request = BenchmarkRequest::create_try( - Some("b-sha-2"), - Some("parent-sha-2"), - 32, - time, - BenchmarkRequestStatus::ArtifactsReady, - "cranelift", - "", - ); - - let release_benchmark_request = BenchmarkRequest::create_release( - "1.8.0", - time, - BenchmarkRequestStatus::ArtifactsReady, - "cranelift,llvm", - "", - ); + Utc::now(), + )) + .await + .unwrap(); - let db = db.connection().await; - db.insert_benchmark_request(&master_benchmark_request) - .await - .unwrap(); - db.insert_benchmark_request(&try_benchmark_request) + db.insert_benchmark_request(&BenchmarkRequest::create_release("a-sha-1", Utc::now())) .await - .unwrap(); - db.insert_benchmark_request(&release_benchmark_request) - .await - .unwrap(); - // duplicate insert - assert!(db - .insert_benchmark_request(&master_benchmark_request) - .await - .is_err()); + .expect_err("it was possible to insert a second commit with the same SHA"); Ok(ctx) }) .await; } + // Check that we can't have multiple non-completed try requests on the same PR #[tokio::test] - async fn get_benchmark_requests_by_status() { - // Ensure we get back the requests matching the status with no date - // limit + async fn multiple_non_completed_try_requests() { run_postgres_test(|ctx| async { let db = ctx.db_client(); - let time = chrono::DateTime::from_str("2021-09-01T00:00:00.000Z").unwrap(); - let master_benchmark_request = BenchmarkRequest::create_master( - "a-sha-1", - "parent-sha-1", - 42, - time, - BenchmarkRequestStatus::ArtifactsReady, - "llvm", - "", - ); - - let try_benchmark_request = BenchmarkRequest::create_try( - Some("b-sha-2"), - Some("parent-sha-2"), - 32, - time, - BenchmarkRequestStatus::Completed, - "cranelift", - "", - ); + let db = db.connection().await; - let release_benchmark_request = BenchmarkRequest::create_release( - "1.8.0", - time, - BenchmarkRequestStatus::ArtifactsReady, - "cranelift,llvm", - "", - ); + // Completed + let req_a = BenchmarkRequest::create_try_without_artifacts(42, Utc::now(), "", ""); + // WaitingForArtifacts + let req_b = BenchmarkRequest::create_try_without_artifacts(42, Utc::now(), "", ""); + let req_c = BenchmarkRequest::create_try_without_artifacts(42, Utc::now(), "", ""); - let db = db.connection().await; - db.insert_benchmark_request(&master_benchmark_request) - .await - .unwrap(); - db.insert_benchmark_request(&try_benchmark_request) - .await - .unwrap(); - db.insert_benchmark_request(&release_benchmark_request) + db.insert_benchmark_request(&req_a).await.unwrap(); + db.attach_shas_to_try_benchmark_request(42, "sha1", "sha-parent-1") .await .unwrap(); - let requests = db - .get_benchmark_requests_by_status(&[BenchmarkRequestStatus::ArtifactsReady]) - .await - .unwrap(); + db.update_benchmark_request_status( + "sha1", + BenchmarkRequestStatus::Completed { + completed_at: Utc::now(), + }, + ) + .await + .unwrap(); - assert_eq!(requests.len(), 2); - assert_eq!(requests[0].status, BenchmarkRequestStatus::ArtifactsReady); - assert_eq!(requests[1].status, BenchmarkRequestStatus::ArtifactsReady); + // This should be fine, req_a was completed + db.insert_benchmark_request(&req_b).await.unwrap(); + // This should fail, we can't have two queued requests at once + db.insert_benchmark_request(&req_c).await.expect_err( + "It was possible to record two try benchmark requests without artifacts", + ); Ok(ctx) }) .await; } + // Check that we can't have multiple master requests on the same PR #[tokio::test] - async fn update_benchmark_request_status() { - // Insert one item into the database, change the status and then - // get the item back out again to ensure it has changed status + async fn multiple_master_requests_same_pr() { run_postgres_test(|ctx| async { let db = ctx.db_client(); - let time = chrono::DateTime::from_str("2021-09-01T00:00:00.000Z").unwrap(); - let master_benchmark_request = BenchmarkRequest::create_master( + let db = db.connection().await; + + db.insert_benchmark_request(&BenchmarkRequest::create_master( "a-sha-1", "parent-sha-1", 42, - time, - BenchmarkRequestStatus::ArtifactsReady, - "llvm", - "", - ); - - let mut db = db.connection().await; - db.insert_benchmark_request(&master_benchmark_request) - .await - .unwrap(); - - db.update_benchmark_request_status( - &master_benchmark_request, - BenchmarkRequestStatus::InProgress, - ) + Utc::now(), + )) .await .unwrap(); - let requests = db - .get_benchmark_requests_by_status(&[BenchmarkRequestStatus::InProgress]) - .await - .unwrap(); - - assert_eq!(requests.len(), 1); - assert_eq!(requests[0].tag(), master_benchmark_request.tag()); - assert_eq!(requests[0].status, BenchmarkRequestStatus::InProgress); + db.insert_benchmark_request(&BenchmarkRequest::create_master( + "a-sha-2", + "parent-sha-2", + 42, + Utc::now(), + )) + .await + .expect_err("it was possible to insert a second master commit on the same PR"); Ok(ctx) }) @@ -563,37 +500,45 @@ mod tests { } #[tokio::test] - async fn updating_try_commits() { + async fn load_pending_benchmark_requests() { run_postgres_test(|ctx| async { let db = ctx.db_client(); - let db = db.connection().await; let time = chrono::DateTime::from_str("2021-09-01T00:00:00.000Z").unwrap(); - let pr = 42; - - let try_benchmark_request = BenchmarkRequest::create_try( - None, - None, - pr, - time, - BenchmarkRequestStatus::WaitingForArtifacts, - "cranelift", - "", - ); - db.insert_benchmark_request(&try_benchmark_request) - .await - .unwrap(); - db.attach_shas_to_try_benchmark_request(pr, "foo", "bar") - .await - .unwrap(); - let requests = db - .get_benchmark_requests_by_status(&[BenchmarkRequestStatus::ArtifactsReady]) + + // ArtifactsReady + let req_a = BenchmarkRequest::create_master("sha-1", "parent-sha-1", 42, time); + // ArtifactsReady + let req_b = BenchmarkRequest::create_release("1.80.0", time); + // WaitingForArtifacts + let req_c = BenchmarkRequest::create_try_without_artifacts(50, time, "", ""); + // InProgress + let req_d = BenchmarkRequest::create_master("sha-2", "parent-sha-2", 51, time); + // Completed + let req_e = BenchmarkRequest::create_master("sha-3", "parent-sha-3", 52, time); + + let db = db.connection().await; + for &req in &[&req_a, &req_b, &req_c, &req_d, &req_e] { + db.insert_benchmark_request(req).await.unwrap(); + } + + db.update_benchmark_request_status("sha-2", BenchmarkRequestStatus::InProgress) .await .unwrap(); + db.update_benchmark_request_status( + "sha-3", + BenchmarkRequestStatus::Completed { + completed_at: Utc::now(), + }, + ) + .await + .unwrap(); + + let requests = db.load_pending_benchmark_requests().await.unwrap(); - assert_eq!(requests.len(), 1); - assert_eq!(requests[0].tag(), Some("foo")); - assert_eq!(requests[0].parent_sha(), Some("bar")); - assert_eq!(requests[0].status, BenchmarkRequestStatus::ArtifactsReady); + assert_eq!(requests.len(), 3); + for req in &[req_a, req_b, req_d] { + assert!(requests.iter().any(|r| r.tag() == req.tag())); + } Ok(ctx) }) @@ -601,72 +546,39 @@ mod tests { } #[tokio::test] - async fn adding_try_commit_to_completed_request() { + async fn attach_shas_to_try_benchmark_request() { run_postgres_test(|ctx| async { let db = ctx.db_client(); let db = db.connection().await; - let time = chrono::DateTime::from_str("2021-09-01T00:00:00.000Z").unwrap(); - let pr = 42; - - let completed_try = BenchmarkRequest::create_try( - Some("sha-2"), - Some("p-sha-1"), - pr, - time, - BenchmarkRequestStatus::Completed, - "cranelift", - "", - ); - db.insert_benchmark_request(&completed_try).await.unwrap(); - - let try_benchmark_request = BenchmarkRequest::create_try( - None, - None, - pr, - time, - BenchmarkRequestStatus::WaitingForArtifacts, - "cranelift", - "", - ); - // deliberately insert twice - db.insert_benchmark_request(&try_benchmark_request) - .await - .unwrap(); - // this one should fail - assert!(db - .insert_benchmark_request(&try_benchmark_request) - .await - .is_err()); - db.attach_shas_to_try_benchmark_request(pr, "foo", "bar") + + let req = BenchmarkRequest::create_try_without_artifacts(42, Utc::now(), "", ""); + + db.insert_benchmark_request(&req).await.unwrap(); + db.attach_shas_to_try_benchmark_request(42, "sha1", "sha-parent-1") .await .unwrap(); - let requests = db - .get_benchmark_requests_by_status(&[ - BenchmarkRequestStatus::WaitingForArtifacts, - BenchmarkRequestStatus::ArtifactsReady, - BenchmarkRequestStatus::InProgress, - BenchmarkRequestStatus::Completed, - ]) + let req_db = db + .load_pending_benchmark_requests() .await + .unwrap() + .into_iter() + .next() .unwrap(); - - assert_eq!(requests.len(), 2); - let completed_try = requests - .iter() - .find(|req| req.status == BenchmarkRequestStatus::Completed); - assert!(completed_try.is_some()); - assert_eq!(completed_try.unwrap().pr(), Some(&pr)); - assert_eq!(completed_try.unwrap().tag(), Some("sha-2")); - assert_eq!(completed_try.unwrap().parent_sha(), Some("p-sha-1")); - - let artifacts_ready_try = requests - .iter() - .find(|req| req.status == BenchmarkRequestStatus::ArtifactsReady); - assert!(artifacts_ready_try.is_some()); - assert_eq!(artifacts_ready_try.unwrap().pr(), Some(&pr)); - assert_eq!(artifacts_ready_try.unwrap().tag(), Some("foo")); - assert_eq!(artifacts_ready_try.unwrap().parent_sha(), Some("bar")); + assert_eq!(req.backends, req_db.backends); + assert_eq!(req.profiles, req_db.profiles); + assert!(matches!( + req_db.status, + BenchmarkRequestStatus::ArtifactsReady + )); + assert!(matches!( + req_db.commit_type, + BenchmarkRequestType::Try { .. } + )); + + assert_eq!(req_db.tag().as_deref(), Some("sha1")); + assert_eq!(req_db.parent_sha().as_deref(), Some("sha-parent-1")); + assert_eq!(req_db.pr(), Some(&42)); Ok(ctx) }) diff --git a/database/src/pool/postgres.rs b/database/src/pool/postgres.rs index 7db2291b0..b8079f5a7 100644 --- a/database/src/pool/postgres.rs +++ b/database/src/pool/postgres.rs @@ -1,12 +1,15 @@ use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction}; use crate::{ ArtifactCollection, ArtifactId, ArtifactIdNumber, Benchmark, BenchmarkRequest, - BenchmarkRequestStatus, BenchmarkRequestType, CodegenBackend, CollectionId, Commit, CommitType, - CompileBenchmark, Date, Index, Profile, QueuedCommit, Scenario, Target, + BenchmarkRequestIndex, BenchmarkRequestStatus, BenchmarkRequestType, CodegenBackend, + CollectionId, Commit, CommitType, CompileBenchmark, Date, Index, Profile, QueuedCommit, + Scenario, Target, BENCHMARK_REQUEST_MASTER_STR, BENCHMARK_REQUEST_RELEASE_STR, + BENCHMARK_REQUEST_STATUS_ARTIFACTS_READY_STR, BENCHMARK_REQUEST_STATUS_COMPLETED_STR, + BENCHMARK_REQUEST_STATUS_IN_PROGRESS_STR, BENCHMARK_REQUEST_TRY_STR, }; use anyhow::Context as _; use chrono::{DateTime, TimeZone, Utc}; -use hashbrown::HashMap; +use hashbrown::{HashMap, HashSet}; use native_tls::{Certificate, TlsConnector}; use postgres_native_tls::MakeTlsConnector; use std::str::FromStr; @@ -387,6 +390,7 @@ pub struct CachedStatements { get_runtime_pstat: Statement, record_artifact_size: Statement, get_artifact_size: Statement, + load_benchmark_request_index: Statement, } pub struct PostgresTransaction<'a> { @@ -568,7 +572,12 @@ impl PostgresConnection { get_artifact_size: conn.prepare(" select component, size from artifact_size where aid = $1 - ").await.unwrap() + ").await.unwrap(), + load_benchmark_request_index: conn.prepare(" + SELECT tag, status + FROM benchmark_request + WHERE tag IS NOT NULL + ").await.unwrap(), }), conn, } @@ -1422,111 +1431,52 @@ where Ok(()) } - async fn get_benchmark_requests_by_status( - &self, - statuses: &[BenchmarkRequestStatus], - ) -> anyhow::Result> { - // There is a small period of time where a try commit's parent_sha - // could be NULL, this query will filter that out. - let query = " - SELECT - tag, - parent_sha, - pr, - commit_type, - status, - created_at, - completed_at, - backends, - profiles - FROM benchmark_request - WHERE status = ANY($1)" - .to_string(); - - let rows = self + async fn load_benchmark_request_index(&self) -> anyhow::Result { + let requests = self .conn() - .query(&query, &[&statuses]) + .query(&self.statements().load_benchmark_request_index, &[]) .await - .context("Failed to get benchmark requests")?; + .context("Cannot load benchmark request index")?; - let benchmark_requests = rows - .iter() - .map(|row| { - let tag = row.get::<_, &str>(0); - let parent_sha = row.get::<_, Option<&str>>(1); - let pr = row.get::<_, Option>(2); - let commit_type = row.get::<_, &str>(3); - let status = row.get::<_, BenchmarkRequestStatus>(4); - let created_at = row.get::<_, DateTime>(5); - let completed_at = row.get::<_, Option>>(6); - let backends = row.get::<_, &str>(7); - let profiles = row.get::<_, &str>(8); + let mut all = HashSet::with_capacity(requests.len()); + let mut completed = HashSet::with_capacity(requests.len()); + for request in requests { + let tag = request.get::<_, String>(0); + let status = request.get::<_, &str>(1); - match commit_type { - "try" => { - let mut try_benchmark = BenchmarkRequest::create_try( - Some(tag), - parent_sha, - pr.unwrap() as u32, - created_at, - status, - backends, - profiles, - ); - try_benchmark.completed_at = completed_at; - try_benchmark - } - "master" => { - let mut master_benchmark = BenchmarkRequest::create_master( - tag, - parent_sha.unwrap(), - pr.unwrap() as u32, - created_at, - status, - backends, - profiles, - ); - master_benchmark.completed_at = completed_at; - master_benchmark - } - "release" => { - let mut release_benchmark = BenchmarkRequest::create_release( - tag, created_at, status, backends, profiles, - ); - release_benchmark.completed_at = completed_at; - release_benchmark - } - _ => panic!( - "Invalid `commit_type` for `BenchmarkRequest` {}", - commit_type - ), - } - }) - .collect(); - Ok(benchmark_requests) + if status == BENCHMARK_REQUEST_STATUS_COMPLETED_STR { + completed.insert(tag.clone()); + } + all.insert(tag); + } + Ok(BenchmarkRequestIndex { all, completed }) } async fn update_benchmark_request_status( - &mut self, - benchmark_request: &BenchmarkRequest, - benchmark_request_status: BenchmarkRequestStatus, + &self, + tag: &str, + status: BenchmarkRequestStatus, ) -> anyhow::Result<()> { - let tx = self - .conn_mut() - .transaction() + let status_str = status.as_str(); + let completed_at = status.completed_at(); + let modified_rows = self + .conn() + .execute( + r#" + UPDATE benchmark_request + SET status = $1, completed_at = $2 + WHERE tag = $3;"#, + &[&status_str, &completed_at, &tag], + ) .await - .context("failed to start transaction")?; - - tx.execute( - "UPDATE benchmark_request SET status = $1 WHERE tag = $2;", - &[&benchmark_request_status, &benchmark_request.tag()], - ) - .await - .context("failed to execute UPDATE benchmark_request")?; - - tx.commit().await.context("failed to commit transaction")?; - - Ok(()) + .context("failed to update benchmark request status")?; + if modified_rows == 0 { + Err(anyhow::anyhow!( + "Could not update status of benchmark request with tag `{tag}`, it was not found." + )) + } else { + Ok(()) + } } async fn attach_shas_to_try_benchmark_request( @@ -1557,10 +1507,92 @@ where ], ) .await - .context("failed to execute UPDATE benchmark_request")?; + .context("failed to attach SHAs to try benchmark request")?; Ok(()) } + + async fn load_pending_benchmark_requests(&self) -> anyhow::Result> { + let query = format!( + r#" + SELECT + tag, + parent_sha, + pr, + commit_type, + status, + created_at, + completed_at, + backends, + profiles + FROM benchmark_request + WHERE status IN('{BENCHMARK_REQUEST_STATUS_ARTIFACTS_READY_STR}', '{BENCHMARK_REQUEST_STATUS_IN_PROGRESS_STR}')"# + ); + + let rows = self + .conn() + .query(&query, &[]) + .await + .context("Failed to get pending benchmark requests")?; + + let requests = rows + .into_iter() + .map(|row| { + let tag = row.get::<_, Option>(0); + let parent_sha = row.get::<_, Option>(1); + let pr = row.get::<_, Option>(2); + let commit_type = row.get::<_, &str>(3); + let status = row.get::<_, &str>(4); + let created_at = row.get::<_, DateTime>(5); + let completed_at = row.get::<_, Option>>(6); + let backends = row.get::<_, String>(7); + let profiles = row.get::<_, String>(8); + + let pr = pr.map(|v| v as u32); + + let status = + BenchmarkRequestStatus::from_str_and_completion_date(status, completed_at) + .expect("Invalid BenchmarkRequestStatus data in the database"); + + match commit_type { + BENCHMARK_REQUEST_TRY_STR => BenchmarkRequest { + commit_type: BenchmarkRequestType::Try { + sha: tag, + parent_sha, + pr: pr.expect("Try commit in the DB without a PR"), + }, + created_at, + status, + backends, + profiles, + }, + BENCHMARK_REQUEST_MASTER_STR => BenchmarkRequest { + commit_type: BenchmarkRequestType::Master { + sha: tag.expect("Master commit in the DB without a SHA"), + parent_sha: parent_sha + .expect("Master commit in the DB without a parent SHA"), + pr: pr.expect("Master commit in the DB without a PR"), + }, + created_at, + status, + backends, + profiles, + }, + BENCHMARK_REQUEST_RELEASE_STR => BenchmarkRequest { + commit_type: BenchmarkRequestType::Release { + tag: tag.expect("Release commit in the DB without a SHA"), + }, + created_at, + status, + backends, + profiles, + }, + _ => panic!("Invalid `commit_type` for `BenchmarkRequest` {commit_type}",), + } + }) + .collect(); + Ok(requests) + } } fn parse_artifact_id(ty: &str, sha: &str, date: Option>) -> ArtifactId { diff --git a/database/src/pool/sqlite.rs b/database/src/pool/sqlite.rs index 6dcc9a608..ecc28484f 100644 --- a/database/src/pool/sqlite.rs +++ b/database/src/pool/sqlite.rs @@ -1,7 +1,8 @@ use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction}; use crate::{ - ArtifactCollection, ArtifactId, Benchmark, BenchmarkRequest, BenchmarkRequestStatus, - CodegenBackend, CollectionId, Commit, CommitType, CompileBenchmark, Date, Profile, Target, + ArtifactCollection, ArtifactId, Benchmark, BenchmarkRequest, BenchmarkRequestIndex, + BenchmarkRequestStatus, CodegenBackend, CollectionId, Commit, CommitType, CompileBenchmark, + Date, Profile, Target, }; use crate::{ArtifactIdNumber, Index, QueuedCommit}; use chrono::{DateTime, TimeZone, Utc}; @@ -1271,17 +1272,18 @@ impl Connection for SqliteConnection { no_queue_implementation_abort!() } - async fn get_benchmark_requests_by_status( - &self, - _statuses: &[BenchmarkRequestStatus], - ) -> anyhow::Result> { + async fn load_benchmark_request_index(&self) -> anyhow::Result { + no_queue_implementation_abort!() + } + + async fn load_pending_benchmark_requests(&self) -> anyhow::Result> { no_queue_implementation_abort!() } async fn update_benchmark_request_status( - &mut self, - _benchmark_request: &BenchmarkRequest, - _benchmark_request_status: BenchmarkRequestStatus, + &self, + _tag: &str, + _status: BenchmarkRequestStatus, ) -> anyhow::Result<()> { no_queue_implementation_abort!() } diff --git a/site/src/job_queue.rs b/site/src/job_queue.rs deleted file mode 100644 index 1bde1ae06..000000000 --- a/site/src/job_queue.rs +++ /dev/null @@ -1,573 +0,0 @@ -use std::{ - path::Path, - str::FromStr, - sync::{Arc, LazyLock}, -}; - -use crate::load::{partition_in_place, SiteCtxt}; -use chrono::{DateTime, NaiveDate, Utc}; -use database::{BenchmarkRequest, BenchmarkRequestStatus, BenchmarkRequestType}; -use hashbrown::HashSet; -use parking_lot::RwLock; -use regex::Regex; -use tokio::time::{self, Duration}; - -pub fn run_new_queue() -> bool { - std::env::var("RUN_CRON") - .ok() - .and_then(|x| x.parse().ok()) - .unwrap_or(false) -} - -/// Store the latest master commits or do nothing if all of them are -/// already in the database -async fn create_benchmark_request_master_commits( - ctxt: &Arc, - conn: &dyn database::pool::Connection, -) -> anyhow::Result<()> { - let master_commits = &ctxt.get_master_commits().commits; - // TODO; delete at some point in the future - let cutoff: chrono::DateTime = chrono::DateTime::from_str("2025-06-01T00:00:00.000Z")?; - - for master_commit in master_commits { - // We don't want to add masses of obsolete data - if master_commit.time >= cutoff { - let pr = master_commit.pr.unwrap_or(0); - let benchmark = BenchmarkRequest::create_master( - &master_commit.sha, - &master_commit.parent_sha, - pr, - master_commit.time, - BenchmarkRequestStatus::ArtifactsReady, - "", - "", - ); - if let Err(e) = conn.insert_benchmark_request(&benchmark).await { - log::error!("Failed to insert master benchmark request: {}", e); - } - } - } - Ok(()) -} - -/// Parses strings in the following formats extracting the Date & release tag -/// `static.rust-lang.org/dist/2016-05-24/channel-rust-1.9.0.toml` -/// `static.rust-lang.org/dist/2016-05-31/channel-rust-nightly.toml` -/// `static.rust-lang.org/dist/2016-06-01/channel-rust-beta.toml` -/// `static.rust-lang.org/dist/2025-06-26/channel-rust-1.89-beta.toml` -/// `static.rust-lang.org/dist/2025-06-26/channel-rust-1.89.0-beta.toml` -/// `static.rust-lang.org/dist/2025-06-26/channel-rust-1.89.0-beta.2.toml` -fn parse_release_string(url: &str) -> Option<(String, DateTime)> { - static VERSION_RE: LazyLock = LazyLock::new(|| Regex::new(r"(\d+\.\d+\.\d+)").unwrap()); - - // Grab ".../YYYY-MM-DD/FILE.toml" components with Path helpers. - let file = Path::new(url).file_name().and_then(|n| n.to_str())?; - - let date_str = Path::new(url) - .parent() - .and_then(Path::file_name) - .and_then(|n| n.to_str())?; - - // No other beta releases are recognized as toolchains. - // - // We also have names like this: - // - // * channel-rust-1.75-beta.toml - // * channel-rust-1.75.0-beta.toml - // * channel-rust-1.75.0-beta.1.toml - // - // Which should get ignored for now, they're not consumable via rustup yet. - if file.contains("beta") && file != "channel-rust-beta.toml" { - return None; - } - - // Parse the YYYY-MM-DD segment and stamp it with *current* UTC time. - if let Ok(naive) = NaiveDate::parse_from_str(date_str, "%Y-%m-%d") { - let published = naive - .and_hms_opt(0, 0, 0) - .expect("valid HMS") - .and_local_timezone(Utc) - .single() - .unwrap(); - - // Special-case the rolling beta channel. - if file == "channel-rust-beta.toml" { - return Some((format!("beta-{date_str}"), published)); - } - - // Otherwise pull out a semver like "1.70.0" and return it. - if let Some(cap) = VERSION_RE.captures(file).and_then(|m| m.get(1)) { - return Some((cap.as_str().to_owned(), published)); - } - } - - None -} - -/// Store the latest release commits or do nothing if all of them are -/// already in the database -async fn create_benchmark_request_releases( - conn: &dyn database::pool::Connection, -) -> anyhow::Result<()> { - let releases: String = reqwest::get("https://static.rust-lang.org/manifests.txt") - .await? - .text() - .await?; - // TODO; delete at some point in the future - let cutoff: chrono::DateTime = chrono::DateTime::from_str("2025-06-01T00:00:00.000Z")?; - - let releases: Vec<_> = releases - .lines() - .rev() - .filter_map(parse_release_string) - .take(20) - .collect(); - - for (name, date_time) in releases { - if date_time >= cutoff { - let release_request = BenchmarkRequest::create_release( - &name, - date_time, - BenchmarkRequestStatus::ArtifactsReady, - "", - "", - ); - if let Err(e) = conn.insert_benchmark_request(&release_request).await { - log::error!("Failed to insert release benchmark request: {}", e); - } - } - } - Ok(()) -} - -/// Sorts try and master requests that are in the `ArtifactsReady` status. -/// Doesn't consider in-progress requests or release artifacts. -fn sort_benchmark_requests(done: &HashSet, request_queue: &mut [BenchmarkRequest]) { - let mut done: HashSet = done.iter().cloned().collect(); - - // Ensure all the items are ready to be sorted, if they are not this is - // undefined behaviour - assert!(request_queue.iter().all(|bmr| { - bmr.status == BenchmarkRequestStatus::ArtifactsReady - && matches!( - bmr.commit_type, - BenchmarkRequestType::Master { .. } | BenchmarkRequestType::Try { .. } - ) - })); - - let mut finished = 0; - while finished < request_queue.len() { - // The next level is those elements in the unordered queue which - // are ready to be benchmarked (i.e., those with parent in done or no - // parent). - let level_len = partition_in_place(request_queue[finished..].iter_mut(), |bmr| { - bmr.parent_sha().is_none_or(|parent| done.contains(parent)) - }); - - // No commit is ready for benchmarking. This can happen e.g. when a try parent commit - // was forcefully removed from the master branch of rust-lang/rust. In this case, just - // let the commits be benchmarked in the current order that we have, these benchmark runs - // just won't have a parent result available. - if level_len == 0 { - if cfg!(test) { - panic!("No commit is ready for benchmarking"); - } else { - log::warn!("No commit is ready for benchmarking"); - return; - } - } - - // Everything in level has the same topological order, then we sort based on heuristics - let level = &mut request_queue[finished..][..level_len]; - level.sort_unstable_by_key(|bmr| { - ( - // Pr number takes priority - *bmr.pr().unwrap_or(&0), - // Order master commits before try commits - match bmr.commit_type { - BenchmarkRequestType::Try { .. } => 1, - BenchmarkRequestType::Master { .. } => 0, - BenchmarkRequestType::Release { .. } => unreachable!(), - }, - bmr.created_at, - ) - }); - for c in level { - // As the only `commit_type` that will not have a `tag` is a `Try` - // with the status of `AwaitingArtifacts` and we have asserted above - // that all of the statuses of the benchmark requests are - // `ArtifactsReady` it is implausable for this `expect(...)` to be - // hit. - done.insert( - c.tag() - .expect("Tag should exist on a benchmark request being sorted") - .to_string(), - ); - } - finished += level_len; - } -} - -pub trait ExtractIf { - fn extract_if_stable(&mut self, predicate: F) -> Vec - where - F: FnMut(&T) -> bool; -} - -/// Vec method `extract_if` is unstable, this very simple implementation -/// can be deleted once it is stable -impl ExtractIf for Vec { - fn extract_if_stable(&mut self, mut predicate: F) -> Vec - where - F: FnMut(&T) -> bool, - { - let mut extracted = Vec::new(); - let mut i = 0; - - while i < self.len() { - if predicate(&self[i]) { - extracted.push(self.remove(i)); - } else { - i += 1; - } - } - extracted - } -} - -/// Assumes that master/release artifacts have been put into the DB. -pub async fn build_queue( - conn: &mut dyn database::pool::Connection, - completed_set: &HashSet, -) -> anyhow::Result> { - let mut pending = conn - .get_benchmark_requests_by_status(&[ - BenchmarkRequestStatus::InProgress, - BenchmarkRequestStatus::ArtifactsReady, - ]) - .await?; - - // The queue starts with in progress - let mut queue: Vec = pending - .extract_if_stable(|request| matches!(request.status, BenchmarkRequestStatus::InProgress)); - - // We sort the in-progress ones based on the started date - queue.sort_unstable_by(|a, b| a.created_at.cmp(&b.created_at)); - - // Add release artifacts ordered by the release tag (1.87.0 before 1.88.0) and `created_at`. - let mut release_artifacts: Vec = pending.extract_if_stable(|request| { - matches!(request.commit_type, BenchmarkRequestType::Release { .. }) - }); - - release_artifacts.sort_unstable_by(|a, b| { - a.tag() - .cmp(&b.tag()) - .then_with(|| a.created_at.cmp(&b.created_at)) - }); - - queue.append(&mut release_artifacts); - sort_benchmark_requests(completed_set, &mut pending); - queue.append(&mut pending); - Ok(queue) -} - -/// Enqueue the job into the job_queue -async fn enqueue_next_job(conn: &mut dyn database::pool::Connection) -> anyhow::Result<()> { - // We draw back all completed requests - let completed: HashSet = conn - .get_benchmark_requests_by_status(&[BenchmarkRequestStatus::Completed]) - .await? - .into_iter() - .filter_map(|request| request.tag().map(|tag| tag.to_string())) - .collect(); - - let queue = build_queue(conn, &completed).await?; - - if let Some(request) = queue.into_iter().next() { - if request.status != BenchmarkRequestStatus::InProgress { - // TODO: - // - Uncomment - // - Actually enqueue the jobs - // conn.update_benchmark_request_status(&request, BenchmarkRequestStatus::InProgress) - // .await?; - } - } - - Ok(()) -} - -/// For queueing jobs, add the jobs you want to queue to this function -async fn cron_enqueue_jobs(site_ctxt: &Arc) -> anyhow::Result<()> { - let mut conn = site_ctxt.conn().await; - // Put the master commits into the `benchmark_requests` queue - create_benchmark_request_master_commits(site_ctxt, &*conn).await?; - // Put the releases into the `benchmark_requests` queue - create_benchmark_request_releases(&*conn).await?; - enqueue_next_job(&mut *conn).await?; - Ok(()) -} - -/// Entry point for the cron -pub async fn cron_main(site_ctxt: Arc>>>, seconds: u64) { - let mut interval = time::interval(Duration::from_secs(seconds)); - let ctxt = site_ctxt.clone(); - - loop { - interval.tick().await; - - if let Some(ctxt_clone) = { - let guard = ctxt.read(); - guard.as_ref().cloned() - } { - match cron_enqueue_jobs(&ctxt_clone).await { - Ok(_) => log::info!("Cron job executed at: {:?}", std::time::SystemTime::now()), - Err(e) => log::error!("Cron job failed to execute {}", e), - } - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use chrono::{Datelike, Duration, NaiveDate, TimeZone, Utc}; - use database::tests::run_postgres_test; - - /// Helper: unwrap the Option, panic otherwise. - fn tag(url: &str) -> String { - parse_release_string(url) - .expect("Some") // Option<_> - .0 // take the tag - } - - /// Helper: unwrap the DateTime and keep only the YYYY-MM-DD part - fn day(url: &str) -> NaiveDate { - parse_release_string(url).expect("Some").1.date_naive() - } - - fn days_ago(day_str: &str) -> chrono::DateTime { - // Walk backwards until the first non-digit, then slice - let days = day_str - .strip_prefix("days") - .unwrap() - .parse::() - .unwrap(); - - let timestamp = Utc::now() - Duration::days(days); - // zero out the seconds - Utc.with_ymd_and_hms( - timestamp.year(), - timestamp.month(), - timestamp.day(), - 0, - 0, - 0, - ) - .unwrap() - } - - fn create_master(sha: &str, parent: &str, pr: u32, age_days: &str) -> BenchmarkRequest { - BenchmarkRequest::create_master( - sha, - parent, - pr, - days_ago(age_days), - BenchmarkRequestStatus::ArtifactsReady, - "", - "", - ) - } - - fn create_try(sha: &str, parent: &str, pr: u32, age_days: &str) -> BenchmarkRequest { - BenchmarkRequest::create_try( - Some(sha), - Some(parent), - pr, - days_ago(age_days), - BenchmarkRequestStatus::ArtifactsReady, - "", - "", - ) - } - - fn create_release(tag: &str, age_days: &str) -> BenchmarkRequest { - BenchmarkRequest::create_release( - tag, - days_ago(age_days), - BenchmarkRequestStatus::ArtifactsReady, - "", - "", - ) - } - - async fn db_insert_requests( - conn: &dyn database::pool::Connection, - requests: &[BenchmarkRequest], - ) { - for request in requests { - conn.insert_benchmark_request(&request).await.unwrap(); - } - } - - fn queue_order_matches(queue: &[BenchmarkRequest], expected: &[&str]) { - let queue_shas: Vec<&str> = queue - .iter() - .filter_map(|request| request.tag().map(|tag| tag)) - .collect(); - assert_eq!(queue_shas, expected) - } - - trait BenchmarkRequestExt { - fn with_status(self, status: BenchmarkRequestStatus) -> Self; - } - - impl BenchmarkRequestExt for BenchmarkRequest { - fn with_status(mut self, status: BenchmarkRequestStatus) -> Self { - self.status = status; - self - } - } - - #[tokio::test] - async fn queue_ordering() { - run_postgres_test(|ctx| async { - /* Key: - * +---------------------+ - * | m - master | - * | t - try | - * | r - release | - * | C - Completed | - * | R - Artifacts Ready | - * | IP - In Progress | - * +---------------------+ - * - * This is the graph we have: - * 2: A release - * +------------+ - * | r "v1.2.3" | - * +------------+ - * - * - * - * 1: Currently `in_progress` - * +-----------+ +---------------+ - * | m "rrr" C | -----+--->| t "t1" IP pr1 | - * +-----------+ +---------------+ - * - * - * - * +-----------+ - * | m "aaa" C | - * +-----------+ - * | - * V - * +----------------+ - * | m "mmm" R pr88 | 5: a master commit - * +----------------+ - * - * +-----------+ - * | m "345" C | - * +-----------+ - * | - * V - * +----------------+ - * | m "123" R pr11 | 3: a master commit, high pr number - * +----------------+ - * - * - * +-----------+ - * | m "bar" C | - * +-----------+ - * | - * V - * +----------------+ - * | m "foo" R pr77 | 4: a master commit - * +----------------+ - * | - * V - * +---------------+ - * | t "baz" R pr4 | 6: a try with a low pr, blocked by parent - * +---------------+ - * - * The master commits should take priority, then "yee" followed - * by "baz" - **/ - - let mut db = ctx.db_client().connection().await; - let requests = vec![ - create_master("foo", "bar", 77, "days2"), - create_master("123", "345", 11, "days2"), - create_try("baz", "foo", 4, "days1"), - create_release("v.1.2.3", "days2"), - create_try("t1", "rrr", 1, "days1").with_status(BenchmarkRequestStatus::InProgress), - create_master("mmm", "aaa", 88, "days2"), - ]; - - db_insert_requests(&*db, &requests).await; - - let completed: HashSet = HashSet::from([ - "bar".to_string(), - "345".to_string(), - "rrr".to_string(), - "aaa".to_string(), - ]); - - let sorted: Vec = build_queue(&mut *db, &completed).await.unwrap(); - - queue_order_matches(&sorted, &["t1", "v.1.2.3", "123", "foo", "mmm", "baz"]); - Ok(ctx) - }) - .await; - } - - #[test] - fn parses_stable_versions() { - assert_eq!( - tag("static.rust-lang.org/dist/2016-05-24/channel-rust-1.9.0.toml"), - "1.9.0" - ); - assert_eq!( - day("static.rust-lang.org/dist/2016-05-24/channel-rust-1.9.0.toml"), - NaiveDate::from_ymd_opt(2016, 5, 24).unwrap() - ); - - assert_eq!( - tag("static.rust-lang.org/dist/2025-06-26/channel-rust-1.88.0.toml"), - "1.88.0" - ); - assert_eq!( - day("static.rust-lang.org/dist/2025-06-26/channel-rust-1.88.0.toml"), - NaiveDate::from_ymd_opt(2025, 6, 26).unwrap() - ); - } - - #[test] - fn parses_plain_beta_channel() { - let want = "beta-2016-06-01"; - let url = "static.rust-lang.org/dist/2016-06-01/channel-rust-beta.toml"; - - assert_eq!(tag(url), want); - assert_eq!(day(url), NaiveDate::from_ymd_opt(2016, 6, 1).unwrap()); - } - - #[test] - fn skips_unconsumable_channels() { - // nightly never returns Anything - assert!(parse_release_string( - "static.rust-lang.org/dist/2016-05-31/channel-rust-nightly.toml" - ) - .is_none()); - - // versioned-beta artefacts are skipped too - for should_ignore in [ - "static.rust-lang.org/dist/2025-06-26/channel-rust-1.89-beta.toml", - "static.rust-lang.org/dist/2025-06-26/channel-rust-1.89.0-beta.toml", - "static.rust-lang.org/dist/2025-06-26/channel-rust-1.89.0-beta.2.toml", - ] { - assert!( - parse_release_string(should_ignore).is_none(), - "{should_ignore} should be ignored" - ); - } - } -} diff --git a/site/src/job_queue/mod.rs b/site/src/job_queue/mod.rs new file mode 100644 index 000000000..43ac10986 --- /dev/null +++ b/site/src/job_queue/mod.rs @@ -0,0 +1,384 @@ +mod utils; + +use std::{str::FromStr, sync::Arc}; + +use crate::job_queue::utils::{parse_release_string, ExtractIf}; +use crate::load::{partition_in_place, SiteCtxt}; +use chrono::Utc; +use database::{BenchmarkRequest, BenchmarkRequestIndex, BenchmarkRequestStatus}; +use hashbrown::HashSet; +use parking_lot::RwLock; +use tokio::time::{self, Duration}; + +pub fn run_new_queue() -> bool { + std::env::var("RUN_CRON") + .ok() + .and_then(|x| x.parse().ok()) + .unwrap_or(false) +} + +/// Store the latest master commits or do nothing if all of them are +/// already in the database +async fn create_benchmark_request_master_commits( + ctxt: &Arc, + conn: &dyn database::pool::Connection, + index: &BenchmarkRequestIndex, +) -> anyhow::Result<()> { + let master_commits = &ctxt.get_master_commits().commits; + // TODO; delete at some point in the future + let cutoff: chrono::DateTime = chrono::DateTime::from_str("2025-06-01T00:00:00.000Z")?; + + for master_commit in master_commits { + // We don't want to add masses of obsolete data + if master_commit.time >= cutoff && !index.contains_tag(&master_commit.sha) { + let pr = master_commit.pr.unwrap_or(0); + let benchmark = BenchmarkRequest::create_master( + &master_commit.sha, + &master_commit.parent_sha, + pr, + master_commit.time, + ); + if let Err(error) = conn.insert_benchmark_request(&benchmark).await { + log::error!("Failed to insert master benchmark request: {error:?}"); + } + } + } + Ok(()) +} + +/// Store the latest release commits or do nothing if all of them are +/// already in the database +async fn create_benchmark_request_releases( + conn: &dyn database::pool::Connection, + index: &BenchmarkRequestIndex, +) -> anyhow::Result<()> { + let releases: String = reqwest::get("https://static.rust-lang.org/manifests.txt") + .await? + .text() + .await?; + // TODO; delete at some point in the future + let cutoff: chrono::DateTime = chrono::DateTime::from_str("2025-06-01T00:00:00.000Z")?; + + let releases: Vec<_> = releases + .lines() + .rev() + .filter_map(parse_release_string) + .take(20) + .collect(); + + for (name, date_time) in releases { + if date_time >= cutoff && !index.contains_tag(&name) { + let release_request = BenchmarkRequest::create_release(&name, date_time); + if let Err(error) = conn.insert_benchmark_request(&release_request).await { + log::error!("Failed to insert release benchmark request: {error}"); + } + } + } + Ok(()) +} + +/// Sorts try and master requests that are in the `ArtifactsReady` status. +/// Doesn't consider in-progress requests or release artifacts. +fn sort_benchmark_requests(index: &BenchmarkRequestIndex, request_queue: &mut [BenchmarkRequest]) { + let mut done: HashSet = index.completed_requests().clone(); + + // Ensure all the items are ready to be sorted, if they are not this is + // undefined behaviour + assert!(request_queue.iter().all(|bmr| { + bmr.status() == BenchmarkRequestStatus::ArtifactsReady && (bmr.is_master() || bmr.is_try()) + })); + + let mut finished = 0; + while finished < request_queue.len() { + // The next level is those elements in the unordered queue which + // are ready to be benchmarked (i.e., those with parent in done or no + // parent). + let level_len = partition_in_place(request_queue[finished..].iter_mut(), |bmr| { + bmr.parent_sha().is_none_or(|parent| done.contains(parent)) + }); + + // No commit is ready for benchmarking. This can happen e.g. when a try parent commit + // was forcefully removed from the master branch of rust-lang/rust. In this case, just + // let the commits be benchmarked in the current order that we have, these benchmark runs + // just won't have a parent result available. + if level_len == 0 { + if cfg!(test) { + panic!("No commit is ready for benchmarking"); + } else { + log::warn!("No commit is ready for benchmarking"); + return; + } + } + + // Everything in level has the same topological order, then we sort based on heuristics + let level = &mut request_queue[finished..][..level_len]; + level.sort_unstable_by_key(|bmr| { + ( + // PR number takes priority + *bmr.pr().unwrap_or(&0), + // Order master commits before try commits + if bmr.is_master() { 0 } else { 1 }, + bmr.created_at(), + ) + }); + for c in level { + // As the only `commit_type` that will not have a `tag` is a `Try` + // with the status of `AwaitingArtifacts` and we have asserted above + // that all of the statuses of the benchmark requests are + // `ArtifactsReady` it is implausable for this `expect(...)` to be + // hit. + done.insert( + c.tag() + .expect("Tag should exist on a benchmark request being sorted") + .to_string(), + ); + } + finished += level_len; + } +} + +/// Creates a benchmark request queue that determines in what order will +/// the requests be benchmarked. The ordering should be created in such a way that +/// after an in-progress request is finished, the ordering of the rest of the queue does not +/// change (unless some other request was added to the queue in the meantime). +/// +/// Does not consider requests that are waiting for artifacts or that are alredy completed. +pub async fn build_queue( + conn: &dyn database::pool::Connection, + index: &BenchmarkRequestIndex, +) -> anyhow::Result> { + // Load ArtifactsReady and InProgress benchmark requests + let mut pending = conn.load_pending_benchmark_requests().await?; + + // The queue starts with in progress + let mut queue: Vec = pending.extract_if_stable(|request| { + matches!(request.status(), BenchmarkRequestStatus::InProgress) + }); + + // We sort the in-progress ones based on the started date + queue.sort_unstable_by_key(|req| req.created_at()); + + // Add release artifacts ordered by the release tag (1.87.0 before 1.88.0) and `created_at`. + let mut release_artifacts: Vec = + pending.extract_if_stable(|request| request.is_release()); + + release_artifacts.sort_unstable_by(|a, b| { + a.tag() + .cmp(&b.tag()) + .then_with(|| a.created_at().cmp(&b.created_at())) + }); + + queue.append(&mut release_artifacts); + sort_benchmark_requests(index, &mut pending); + queue.append(&mut pending); + Ok(queue) +} + +/// Enqueue the job into the job_queue +async fn enqueue_next_job( + conn: &dyn database::pool::Connection, + index: &mut BenchmarkRequestIndex, +) -> anyhow::Result<()> { + let _queue = build_queue(conn, index).await?; + Ok(()) +} + +/// For queueing jobs, add the jobs you want to queue to this function +async fn cron_enqueue_jobs(site_ctxt: &Arc) -> anyhow::Result<()> { + let conn = site_ctxt.conn().await; + let mut index = conn.load_benchmark_request_index().await?; + // Put the master commits into the `benchmark_requests` queue + create_benchmark_request_master_commits(site_ctxt, &*conn, &index).await?; + // Put the releases into the `benchmark_requests` queue + create_benchmark_request_releases(&*conn, &index).await?; + enqueue_next_job(&*conn, &mut index).await?; + Ok(()) +} + +/// Entry point for the cron +pub async fn cron_main(site_ctxt: Arc>>>, seconds: u64) { + let mut interval = time::interval(Duration::from_secs(seconds)); + let ctxt = site_ctxt.clone(); + + loop { + interval.tick().await; + + if let Some(ctxt_clone) = { + let guard = ctxt.read(); + guard.as_ref().cloned() + } { + match cron_enqueue_jobs(&ctxt_clone).await { + Ok(_) => log::info!("Cron job executed at: {:?}", std::time::SystemTime::now()), + Err(e) => log::error!("Cron job failed to execute {}", e), + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::{Datelike, Duration, TimeZone, Utc}; + use database::tests::run_postgres_test; + + fn days_ago(day_str: &str) -> chrono::DateTime { + // Walk backwards until the first non-digit, then slice + let days = day_str + .strip_prefix("days") + .unwrap() + .parse::() + .unwrap(); + + let timestamp = Utc::now() - Duration::days(days); + // zero out the seconds + Utc.with_ymd_and_hms( + timestamp.year(), + timestamp.month(), + timestamp.day(), + 0, + 0, + 0, + ) + .unwrap() + } + + fn create_master(sha: &str, parent: &str, pr: u32, age_days: &str) -> BenchmarkRequest { + BenchmarkRequest::create_master(sha, parent, pr, days_ago(age_days)) + } + + fn create_try(pr: u32, age_days: &str) -> BenchmarkRequest { + BenchmarkRequest::create_try_without_artifacts(pr, days_ago(age_days), "", "") + } + + fn create_release(tag: &str, age_days: &str) -> BenchmarkRequest { + BenchmarkRequest::create_release(tag, days_ago(age_days)) + } + + async fn db_insert_requests( + conn: &dyn database::pool::Connection, + requests: &[BenchmarkRequest], + ) { + for request in requests { + conn.insert_benchmark_request(&request).await.unwrap(); + } + } + + async fn mark_as_completed(conn: &dyn database::pool::Connection, shas: &[&str]) { + let completed_at = Utc::now(); + for sha in shas { + conn.update_benchmark_request_status( + sha, + BenchmarkRequestStatus::Completed { completed_at }, + ) + .await + .unwrap(); + } + } + + fn queue_order_matches(queue: &[BenchmarkRequest], expected: &[&str]) { + let queue_shas: Vec<&str> = queue + .iter() + .filter_map(|request| request.tag().map(|tag| tag)) + .collect(); + assert_eq!(queue_shas, expected) + } + + #[tokio::test] + async fn queue_ordering() { + run_postgres_test(|ctx| async { + /* Key: + * +---------------------+ + * | m - master | + * | t - try | + * | r - release | + * | C - Completed | + * | R - Artifacts Ready | + * | IP - In Progress | + * +---------------------+ + * + * This is the graph we have: + * 2: A release + * +------------+ + * | r "v1.2.3" | + * +------------+ + * + * + * + * 1: Currently `in_progress` + * +-----------+ +-----------------+ + * | m "rrr" C | -----+--->| t "try1" IP pr6 | + * +-----------+ +-----------------+ + * + * + * + * +-----------+ + * | m "aaa" C | + * +-----------+ + * | + * V + * +----------------+ + * | m "mmm" R pr18 | 5: a master commit + * +----------------+ + * + * +-----------+ + * | m "345" C | + * +-----------+ + * | + * V + * +----------------+ + * | m "123" R pr14 | 3: a master commit, high PR number + * +----------------+ + * + * + * +-----------+ + * | m "bar" C | + * +-----------+ + * | + * V + * +----------------+ + * | m "foo" R pr15 | 4: a master commit + * +----------------+ + * | + * V + * +----------------+ + * | t "baz" R pr17 | 6: a try with a low PR, blocked by parent + * +----------------+ + **/ + + let db = ctx.db_client().connection().await; + let requests = vec![ + create_master("bar", "parent1", 10, "days2"), + create_master("345", "parent2", 11, "days2"), + create_master("aaa", "parent3", 12, "days2"), + create_master("rrr", "parent4", 13, "days2"), + create_master("123", "bar", 14, "days2"), + create_master("foo", "345", 15, "days2"), + create_try(16, "days1"), + create_release("v1.2.3", "days2"), + create_try(17, "days1"), + create_master("mmm", "aaa", 18, "days2"), + ]; + + db_insert_requests(&*db, &requests).await; + db.attach_shas_to_try_benchmark_request(16, "try1", "rrr") + .await + .unwrap(); + db.update_benchmark_request_status("try1", BenchmarkRequestStatus::InProgress) + .await + .unwrap(); + db.attach_shas_to_try_benchmark_request(17, "baz", "foo") + .await + .unwrap(); + + mark_as_completed(&*db, &["bar", "345", "aaa", "rrr"]).await; + + let index = db.load_benchmark_request_index().await.unwrap(); + + let sorted: Vec = build_queue(&*db, &index).await.unwrap(); + + queue_order_matches(&sorted, &["try1", "v1.2.3", "123", "foo", "mmm", "baz"]); + Ok(ctx) + }) + .await; + } +} diff --git a/site/src/job_queue/utils.rs b/site/src/job_queue/utils.rs new file mode 100644 index 000000000..3ade039c4 --- /dev/null +++ b/site/src/job_queue/utils.rs @@ -0,0 +1,154 @@ +use chrono::{DateTime, NaiveDate, Utc}; +use regex::Regex; +use std::path::Path; +use std::sync::LazyLock; + +pub trait ExtractIf { + fn extract_if_stable(&mut self, predicate: F) -> Vec + where + F: FnMut(&T) -> bool; +} + +/// Vec method `extract_if` is unstable, this very simple implementation +/// can be deleted once it is stable +impl ExtractIf for Vec { + fn extract_if_stable(&mut self, mut predicate: F) -> Vec + where + F: FnMut(&T) -> bool, + { + let mut extracted = Vec::new(); + let mut i = 0; + + while i < self.len() { + if predicate(&self[i]) { + extracted.push(self.remove(i)); + } else { + i += 1; + } + } + extracted + } +} + +/// Parses strings in the following formats extracting the Date & release tag +/// `static.rust-lang.org/dist/2016-05-24/channel-rust-1.9.0.toml` +/// `static.rust-lang.org/dist/2016-05-31/channel-rust-nightly.toml` +/// `static.rust-lang.org/dist/2016-06-01/channel-rust-beta.toml` +/// `static.rust-lang.org/dist/2025-06-26/channel-rust-1.89-beta.toml` +/// `static.rust-lang.org/dist/2025-06-26/channel-rust-1.89.0-beta.toml` +/// `static.rust-lang.org/dist/2025-06-26/channel-rust-1.89.0-beta.2.toml` +pub fn parse_release_string(url: &str) -> Option<(String, DateTime)> { + static VERSION_RE: LazyLock = LazyLock::new(|| Regex::new(r"(\d+\.\d+\.\d+)").unwrap()); + + // Grab ".../YYYY-MM-DD/FILE.toml" components with Path helpers. + let file = Path::new(url).file_name().and_then(|n| n.to_str())?; + + let date_str = Path::new(url) + .parent() + .and_then(Path::file_name) + .and_then(|n| n.to_str())?; + + // No other beta releases are recognized as toolchains. + // + // We also have names like this: + // + // * channel-rust-1.75-beta.toml + // * channel-rust-1.75.0-beta.toml + // * channel-rust-1.75.0-beta.1.toml + // + // Which should get ignored for now, they're not consumable via rustup yet. + if file.contains("beta") && file != "channel-rust-beta.toml" { + return None; + } + + // Parse the YYYY-MM-DD segment and stamp it with *current* UTC time. + if let Ok(naive) = NaiveDate::parse_from_str(date_str, "%Y-%m-%d") { + let published = naive + .and_hms_opt(0, 0, 0) + .expect("valid HMS") + .and_local_timezone(Utc) + .single() + .unwrap(); + + // Special-case the rolling beta channel. + if file == "channel-rust-beta.toml" { + return Some((format!("beta-{date_str}"), published)); + } + + // Otherwise pull out a semver like "1.70.0" and return it. + if let Some(cap) = VERSION_RE.captures(file).and_then(|m| m.get(1)) { + return Some((cap.as_str().to_owned(), published)); + } + } + + None +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::NaiveDate; + + /// Helper: unwrap the Option, panic otherwise. + fn tag(url: &str) -> String { + parse_release_string(url) + .expect("Some") // Option<_> + .0 // take the tag + } + + /// Helper: unwrap the DateTime and keep only the YYYY-MM-DD part + fn day(url: &str) -> NaiveDate { + parse_release_string(url).expect("Some").1.date_naive() + } + + #[test] + fn parses_stable_versions() { + assert_eq!( + tag("static.rust-lang.org/dist/2016-05-24/channel-rust-1.9.0.toml"), + "1.9.0" + ); + assert_eq!( + day("static.rust-lang.org/dist/2016-05-24/channel-rust-1.9.0.toml"), + NaiveDate::from_ymd_opt(2016, 5, 24).unwrap() + ); + + assert_eq!( + tag("static.rust-lang.org/dist/2025-06-26/channel-rust-1.88.0.toml"), + "1.88.0" + ); + assert_eq!( + day("static.rust-lang.org/dist/2025-06-26/channel-rust-1.88.0.toml"), + NaiveDate::from_ymd_opt(2025, 6, 26).unwrap() + ); + } + + #[test] + fn parses_plain_beta_channel() { + let want = "beta-2016-06-01"; + let url = "static.rust-lang.org/dist/2016-06-01/channel-rust-beta.toml"; + + assert_eq!(tag(url), want); + assert_eq!(day(url), NaiveDate::from_ymd_opt(2016, 6, 1).unwrap()); + } + + #[test] + fn skips_unconsumable_channels() { + // nightly never returns Anything + assert!(parse_release_string( + "static.rust-lang.org/dist/2016-05-31/channel-rust-nightly.toml" + ) + .is_none()); + + // versioned-beta artefacts are skipped too + for should_ignore in [ + "static.rust-lang.org/dist/2025-06-26/channel-rust-1.89-beta.toml", + "static.rust-lang.org/dist/2025-06-26/channel-rust-1.89.0-beta.toml", + "static.rust-lang.org/dist/2025-06-26/channel-rust-1.89.0-beta.2.toml", + ] { + assert!( + parse_release_string(should_ignore).is_none(), + "{should_ignore} should be ignored" + ); + } + } +} diff --git a/site/src/request_handlers/github.rs b/site/src/request_handlers/github.rs index b3a0aedc2..becbd1b27 100644 --- a/site/src/request_handlers/github.rs +++ b/site/src/request_handlers/github.rs @@ -6,7 +6,7 @@ use crate::github::{ use crate::job_queue::run_new_queue; use crate::load::SiteCtxt; -use database::{BenchmarkRequest, BenchmarkRequestStatus}; +use database::BenchmarkRequest; use hashbrown::HashMap; use std::sync::Arc; @@ -74,25 +74,18 @@ async fn handle_issue( Ok(github::Response) } -/// The try does not have a `sha` or a `parent_sha` but we need to keep a record -/// of this commit existing. We make sure there can only be one `pr` with a -/// status of `WaitingForArtifacts` to ensure we don't have duplicates. -async fn queue_partial_try_benchmark_request( +/// The try request does not have a `sha` or a `parent_sha` but we need to keep a record +/// of this commit existing. The DB ensures that there is only one non-completed +/// try benchmark request per `pr`. +async fn record_try_benchmark_request_without_artifacts( conn: &dyn database::pool::Connection, pr: u32, backends: &str, ) { // We only want to run this if the new system is running if run_new_queue() { - let try_request = BenchmarkRequest::create_try( - None, - None, - pr, - chrono::Utc::now(), - BenchmarkRequestStatus::WaitingForArtifacts, - backends, - "", - ); + let try_request = + BenchmarkRequest::create_try_without_artifacts(pr, chrono::Utc::now(), backends, ""); if let Err(e) = conn.insert_benchmark_request(&try_request).await { log::error!("Failed to insert try benchmark request: {}", e); } @@ -125,7 +118,7 @@ async fn handle_rust_timer( Ok(cmd) => { let conn = ctxt.conn().await; - queue_partial_try_benchmark_request( + record_try_benchmark_request_without_artifacts( &*conn, issue.number, cmd.params.backends.unwrap_or(""), @@ -171,7 +164,7 @@ async fn handle_rust_timer( { let conn = ctxt.conn().await; for command in &valid_build_cmds { - queue_partial_try_benchmark_request( + record_try_benchmark_request_without_artifacts( &*conn, issue.number, command.params.backends.unwrap_or(""),