Skip to content

Commit 5136b31

Browse files
authored
Merge pull request #11836 from Turbo87/cloudfront-invalidations
Add Cloudfront invalidations queue to the database
2 parents a08e81a + ace6a30 commit 5136b31

File tree

19 files changed

+318
-36
lines changed

19 files changed

+318
-36
lines changed
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
use crate::schema::cloudfront_invalidation_queue;
2+
use diesel::prelude::*;
3+
use diesel_async::{AsyncPgConnection, RunQueryDsl};
4+
5+
#[derive(Debug, Identifiable, Queryable, QueryableByName, Selectable)]
6+
#[diesel(table_name = cloudfront_invalidation_queue, check_for_backend(diesel::pg::Pg))]
7+
pub struct CloudFrontInvalidationQueueItem {
8+
pub id: i64,
9+
pub path: String,
10+
pub created_at: chrono::DateTime<chrono::Utc>,
11+
}
12+
13+
#[derive(Debug, Insertable)]
14+
#[diesel(table_name = cloudfront_invalidation_queue, check_for_backend(diesel::pg::Pg))]
15+
pub struct NewCloudFrontInvalidationQueueItem<'a> {
16+
pub path: &'a str,
17+
}
18+
19+
impl CloudFrontInvalidationQueueItem {
20+
/// Queue multiple invalidation paths for later processing
21+
pub async fn queue_paths(conn: &mut AsyncPgConnection, paths: &[String]) -> QueryResult<usize> {
22+
let new_items: Vec<_> = paths
23+
.iter()
24+
.map(|path| NewCloudFrontInvalidationQueueItem { path })
25+
.collect();
26+
27+
diesel::insert_into(cloudfront_invalidation_queue::table)
28+
.values(&new_items)
29+
.execute(conn)
30+
.await
31+
}
32+
33+
/// Fetch the oldest paths from the queue
34+
pub async fn fetch_batch(
35+
conn: &mut AsyncPgConnection,
36+
limit: i64,
37+
) -> QueryResult<Vec<CloudFrontInvalidationQueueItem>> {
38+
// Fetch the oldest entries up to the limit
39+
cloudfront_invalidation_queue::table
40+
.order(cloudfront_invalidation_queue::created_at.asc())
41+
.limit(limit)
42+
.load(conn)
43+
.await
44+
}
45+
46+
/// Remove queue items by their IDs
47+
pub async fn remove_items(
48+
conn: &mut AsyncPgConnection,
49+
item_ids: &[i64],
50+
) -> QueryResult<usize> {
51+
diesel::delete(cloudfront_invalidation_queue::table)
52+
.filter(cloudfront_invalidation_queue::id.eq_any(item_ids))
53+
.execute(conn)
54+
.await
55+
}
56+
}

crates/crates_io_database/src/models/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
pub use self::action::{NewVersionOwnerAction, VersionAction, VersionOwnerAction};
22
pub use self::category::{Category, CrateCategory, NewCategory};
3+
pub use self::cloudfront_invalidation_queue::CloudFrontInvalidationQueueItem;
34
pub use self::crate_owner_invitation::{
45
CrateOwnerInvitation, NewCrateOwnerInvitation, NewCrateOwnerInvitationOutcome,
56
};
@@ -22,6 +23,7 @@ pub mod helpers;
2223

2324
mod action;
2425
pub mod category;
26+
mod cloudfront_invalidation_queue;
2527
pub mod crate_owner_invitation;
2628
pub mod default_versions;
2729
mod deleted_crate;

crates/crates_io_database/src/schema.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,18 @@ diesel::table! {
176176
}
177177
}
178178

179+
diesel::table! {
180+
/// Queue for batching CloudFront CDN invalidation requests
181+
cloudfront_invalidation_queue (id) {
182+
/// Unique identifier for each queued invalidation path
183+
id -> Int8,
184+
/// CloudFront path to invalidate (e.g. /crates/serde/serde-1.0.0.crate)
185+
path -> Text,
186+
/// Timestamp when the path was queued for invalidation
187+
created_at -> Timestamptz,
188+
}
189+
}
190+
179191
diesel::table! {
180192
/// Number of downloads per crate. This was extracted from the `crates` table for performance reasons.
181193
crate_downloads (crate_id) {
@@ -1141,6 +1153,7 @@ diesel::allow_tables_to_appear_in_same_query!(
11411153
api_tokens,
11421154
background_jobs,
11431155
categories,
1156+
cloudfront_invalidation_queue,
11441157
crate_downloads,
11451158
crate_owner_invitations,
11461159
crate_owners,

crates/crates_io_database_dump/src/dump-db.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ crates_cnt = "public"
4949
created_at = "public"
5050
path = "public"
5151

52+
[cloudfront_invalidation_queue.columns]
53+
id = "private"
54+
path = "private"
55+
created_at = "private"
56+
5257
[crate_downloads.columns]
5358
crate_id = "public"
5459
downloads = "public"
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- Drop the CloudFront invalidation queue table
2+
DROP TABLE IF EXISTS cloudfront_invalidation_queue;
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
-- Create table for queuing CloudFront invalidation paths
2+
CREATE TABLE cloudfront_invalidation_queue (
3+
id BIGSERIAL PRIMARY KEY,
4+
path TEXT NOT NULL,
5+
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
6+
);
7+
8+
COMMENT ON TABLE cloudfront_invalidation_queue IS 'Queue for batching CloudFront CDN invalidation requests';
9+
COMMENT ON COLUMN cloudfront_invalidation_queue.id IS 'Unique identifier for each queued invalidation path';
10+
COMMENT ON COLUMN cloudfront_invalidation_queue.path IS 'CloudFront path to invalidate (e.g. /crates/serde/serde-1.0.0.crate)';
11+
COMMENT ON COLUMN cloudfront_invalidation_queue.created_at IS 'Timestamp when the path was queued for invalidation';
12+
13+
-- Index for efficient batch processing (oldest first)
14+
CREATE INDEX idx_cloudfront_invalidation_queue_created_at
15+
ON cloudfront_invalidation_queue (created_at);

src/cloudfront.rs

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,40 @@
11
use aws_credential_types::Credentials;
22
use aws_sdk_cloudfront::config::retry::RetryConfig;
33
use aws_sdk_cloudfront::config::{BehaviorVersion, Region};
4+
use aws_sdk_cloudfront::error::{BuildError, SdkError};
5+
use aws_sdk_cloudfront::operation::create_invalidation::CreateInvalidationError;
46
use aws_sdk_cloudfront::types::{InvalidationBatch, Paths};
57
use aws_sdk_cloudfront::{Client, Config};
68
use tracing::{debug, instrument, warn};
79

10+
#[derive(Debug, thiserror::Error)]
11+
pub enum CloudFrontError {
12+
#[error(transparent)]
13+
BuildError(#[from] BuildError),
14+
#[error(transparent)]
15+
SdkError(Box<SdkError<CreateInvalidationError>>),
16+
}
17+
18+
impl From<SdkError<CreateInvalidationError>> for CloudFrontError {
19+
fn from(err: SdkError<CreateInvalidationError>) -> Self {
20+
CloudFrontError::SdkError(Box::new(err))
21+
}
22+
}
23+
24+
impl CloudFrontError {
25+
pub fn is_too_many_invalidations_error(&self) -> bool {
26+
let CloudFrontError::SdkError(sdk_error) = self else {
27+
return false;
28+
};
29+
30+
let Some(service_error) = sdk_error.as_service_error() else {
31+
return false;
32+
};
33+
34+
service_error.is_too_many_invalidations_in_progress()
35+
}
36+
}
37+
838
pub struct CloudFront {
939
client: Client,
1040
distribution_id: String,
@@ -36,13 +66,13 @@ impl CloudFront {
3666
/// Invalidate a file on CloudFront
3767
///
3868
/// `path` is the path to the file to invalidate, such as `config.json`, or `re/ge/regex`
39-
pub async fn invalidate(&self, path: &str) -> anyhow::Result<()> {
69+
pub async fn invalidate(&self, path: &str) -> Result<(), CloudFrontError> {
4070
self.invalidate_many(vec![path.to_string()]).await
4171
}
4272

4373
/// Invalidate multiple paths on Cloudfront.
4474
#[instrument(skip(self))]
45-
pub async fn invalidate_many(&self, mut paths: Vec<String>) -> anyhow::Result<()> {
75+
pub async fn invalidate_many(&self, mut paths: Vec<String>) -> Result<(), CloudFrontError> {
4676
let now = chrono::offset::Utc::now().timestamp_micros();
4777

4878
// We need to ensure that paths have a starting slash.
@@ -72,15 +102,11 @@ impl CloudFront {
72102

73103
debug!("Sending invalidation request");
74104

75-
match invalidation_request.send().await {
76-
Ok(_) => {
77-
debug!("Invalidation request successful");
78-
Ok(())
79-
}
80-
Err(error) => {
81-
warn!(?error, "Invalidation request failed");
82-
Err(error.into())
83-
}
84-
}
105+
Ok(invalidation_request
106+
.send()
107+
.await
108+
.map(|_| ()) // We don't care about the result, just that it worked
109+
.inspect(|_| debug!("Invalidation request successful"))
110+
.inspect_err(|error| warn!(?error, "Invalidation request failed"))?)
85111
}
86112
}

src/worker/environment.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@ use crate::cloudfront::CloudFront;
33
use crate::fastly::Fastly;
44
use crate::storage::Storage;
55
use crate::typosquat;
6+
use crate::worker::jobs::ProcessCloudfrontInvalidationQueue;
67
use anyhow::Context;
78
use bon::Builder;
9+
use crates_io_database::models::CloudFrontInvalidationQueueItem;
810
use crates_io_docs_rs::DocsRsClient;
911
use crates_io_index::{Repository, RepositoryConfig};
1012
use crates_io_og_image::OgImageGenerator;
1113
use crates_io_team_repo::TeamRepo;
14+
use crates_io_worker::BackgroundJob;
1215
use diesel_async::AsyncPgConnection;
1316
use diesel_async::pooled_connection::deadpool::Pool;
1417
use object_store::ObjectStore;
@@ -70,9 +73,20 @@ impl Environment {
7073
}
7174

7275
/// Invalidate a file in all registered CDNs.
73-
pub(crate) async fn invalidate_cdns(&self, path: &str) -> anyhow::Result<()> {
74-
if let Some(cloudfront) = self.cloudfront() {
75-
cloudfront.invalidate(path).await.context("CloudFront")?;
76+
pub(crate) async fn invalidate_cdns(
77+
&self,
78+
conn: &mut AsyncPgConnection,
79+
path: &str,
80+
) -> anyhow::Result<()> {
81+
// Queue CloudFront invalidations for batch processing instead of calling directly
82+
if self.cloudfront().is_some() {
83+
let paths = &[path.to_string()];
84+
let result = CloudFrontInvalidationQueueItem::queue_paths(conn, paths).await;
85+
result.context("Failed to queue CloudFront invalidation path")?;
86+
87+
// Schedule the processing job to handle the queued paths
88+
let result = ProcessCloudfrontInvalidationQueue.enqueue(conn).await;
89+
result.context("Failed to enqueue CloudFront invalidation processing job")?;
7690
}
7791

7892
if let Some(fastly) = self.fastly() {

src/worker/jobs/dump_db.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ impl BackgroundJob for DumpDb {
4747
info!("Database dump tarball uploaded");
4848

4949
info!("Invalidating CDN caches…");
50-
if let Err(error) = env.invalidate_cdns(TAR_PATH).await {
50+
let mut conn = env.deadpool.get().await?;
51+
if let Err(error) = env.invalidate_cdns(&mut conn, TAR_PATH).await {
5152
warn!("Failed to invalidate CDN caches: {error}");
5253
}
5354

@@ -58,7 +59,7 @@ impl BackgroundJob for DumpDb {
5859
info!("Database dump zip file uploaded");
5960

6061
info!("Invalidating CDN caches…");
61-
if let Err(error) = env.invalidate_cdns(ZIP_PATH).await {
62+
if let Err(error) = env.invalidate_cdns(&mut conn, ZIP_PATH).await {
6263
warn!("Failed to invalidate CDN caches: {error}");
6364
}
6465

src/worker/jobs/generate_og_image.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use crate::models::OwnerKind;
22
use crate::schema::*;
33
use crate::worker::Environment;
4+
use crate::worker::jobs::ProcessCloudfrontInvalidationQueue;
45
use anyhow::Context;
6+
use crates_io_database::models::CloudFrontInvalidationQueueItem;
57
use crates_io_og_image::{OgImageAuthorData, OgImageData};
68
use crates_io_worker::BackgroundJob;
79
use diesel::prelude::*;
@@ -104,11 +106,17 @@ impl BackgroundJob for GenerateOgImage {
104106
// Invalidate CDN cache for the OG image
105107
let og_image_path = format!("og-images/{crate_name}.png");
106108

107-
// Invalidate CloudFront CDN
108-
if let Some(cloudfront) = ctx.cloudfront()
109-
&& let Err(error) = cloudfront.invalidate(&og_image_path).await
110-
{
111-
warn!("Failed to invalidate CloudFront CDN for {crate_name}: {error}");
109+
// Queue CloudFront invalidation for batch processing
110+
if ctx.cloudfront().is_some() {
111+
let paths = std::slice::from_ref(&og_image_path);
112+
let result = CloudFrontInvalidationQueueItem::queue_paths(&mut conn, paths).await;
113+
if let Err(error) = result {
114+
warn!("Failed to queue CloudFront invalidation for {crate_name}: {error}");
115+
} else if let Err(error) = ProcessCloudfrontInvalidationQueue.enqueue(&mut conn).await {
116+
warn!(
117+
"Failed to enqueue CloudFront invalidation processing job for {crate_name}: {error}"
118+
);
119+
}
112120
}
113121

114122
// Invalidate Fastly CDN

0 commit comments

Comments
 (0)