Skip to content

Commit 231426b

Browse files
committed
Use ProcessCloudfrontInvalidationQueue background job for CloudFront invalidations
1 parent 92eba0b commit 231426b

File tree

9 files changed

+62
-24
lines changed

9 files changed

+62
-24
lines changed

src/worker/environment.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@ use crate::cloudfront::CloudFront;
33
use crate::fastly::Fastly;
44
use crate::storage::Storage;
55
use crate::typosquat;
6+
use crate::worker::jobs::ProcessCloudfrontInvalidationQueue;
67
use anyhow::Context;
78
use bon::Builder;
9+
use crates_io_database::models::CloudFrontInvalidationQueueItem;
810
use crates_io_docs_rs::DocsRsClient;
911
use crates_io_index::{Repository, RepositoryConfig};
1012
use crates_io_og_image::OgImageGenerator;
1113
use crates_io_team_repo::TeamRepo;
14+
use crates_io_worker::BackgroundJob;
1215
use diesel_async::AsyncPgConnection;
1316
use diesel_async::pooled_connection::deadpool::Pool;
1417
use object_store::ObjectStore;
@@ -70,9 +73,20 @@ impl Environment {
7073
}
7174

7275
/// Invalidate a file in all registered CDNs.
73-
pub(crate) async fn invalidate_cdns(&self, path: &str) -> anyhow::Result<()> {
74-
if let Some(cloudfront) = self.cloudfront() {
75-
cloudfront.invalidate(path).await.context("CloudFront")?;
76+
pub(crate) async fn invalidate_cdns(
77+
&self,
78+
conn: &mut AsyncPgConnection,
79+
path: &str,
80+
) -> anyhow::Result<()> {
81+
// Queue CloudFront invalidations for batch processing instead of calling directly
82+
if self.cloudfront().is_some() {
83+
let paths = &[path.to_string()];
84+
let result = CloudFrontInvalidationQueueItem::queue_paths(conn, paths).await;
85+
result.context("Failed to queue CloudFront invalidation path")?;
86+
87+
// Schedule the processing job to handle the queued paths
88+
let result = ProcessCloudfrontInvalidationQueue.enqueue(conn).await;
89+
result.context("Failed to enqueue CloudFront invalidation processing job")?;
7690
}
7791

7892
if let Some(fastly) = self.fastly() {

src/worker/jobs/dump_db.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ impl BackgroundJob for DumpDb {
4747
info!("Database dump tarball uploaded");
4848

4949
info!("Invalidating CDN caches…");
50-
if let Err(error) = env.invalidate_cdns(TAR_PATH).await {
50+
let mut conn = env.deadpool.get().await?;
51+
if let Err(error) = env.invalidate_cdns(&mut conn, TAR_PATH).await {
5152
warn!("Failed to invalidate CDN caches: {error}");
5253
}
5354

@@ -58,7 +59,7 @@ impl BackgroundJob for DumpDb {
5859
info!("Database dump zip file uploaded");
5960

6061
info!("Invalidating CDN caches…");
61-
if let Err(error) = env.invalidate_cdns(ZIP_PATH).await {
62+
if let Err(error) = env.invalidate_cdns(&mut conn, ZIP_PATH).await {
6263
warn!("Failed to invalidate CDN caches: {error}");
6364
}
6465

src/worker/jobs/generate_og_image.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use crate::models::OwnerKind;
22
use crate::schema::*;
33
use crate::worker::Environment;
4+
use crate::worker::jobs::ProcessCloudfrontInvalidationQueue;
45
use anyhow::Context;
6+
use crates_io_database::models::CloudFrontInvalidationQueueItem;
57
use crates_io_og_image::{OgImageAuthorData, OgImageData};
68
use crates_io_worker::BackgroundJob;
79
use diesel::prelude::*;
@@ -104,11 +106,17 @@ impl BackgroundJob for GenerateOgImage {
104106
// Invalidate CDN cache for the OG image
105107
let og_image_path = format!("og-images/{crate_name}.png");
106108

107-
// Invalidate CloudFront CDN
108-
if let Some(cloudfront) = ctx.cloudfront()
109-
&& let Err(error) = cloudfront.invalidate(&og_image_path).await
110-
{
111-
warn!("Failed to invalidate CloudFront CDN for {crate_name}: {error}");
109+
// Queue CloudFront invalidation for batch processing
110+
if ctx.cloudfront().is_some() {
111+
let paths = std::slice::from_ref(&og_image_path);
112+
let result = CloudFrontInvalidationQueueItem::queue_paths(&mut conn, paths).await;
113+
if let Err(error) = result {
114+
warn!("Failed to queue CloudFront invalidation for {crate_name}: {error}");
115+
} else if let Err(error) = ProcessCloudfrontInvalidationQueue.enqueue(&mut conn).await {
116+
warn!(
117+
"Failed to enqueue CloudFront invalidation processing job for {crate_name}: {error}"
118+
);
119+
}
112120
}
113121

114122
// Invalidate Fastly CDN

src/worker/jobs/index/sync.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use crate::index::get_index_data;
22
use crate::tasks::spawn_blocking;
33
use crate::worker::Environment;
4+
use crate::worker::jobs::ProcessCloudfrontInvalidationQueue;
45
use anyhow::Context;
6+
use crates_io_database::models::CloudFrontInvalidationQueueItem;
57
use crates_io_index::Repository;
68
use crates_io_worker::BackgroundJob;
79
use serde::{Deserialize, Serialize};
@@ -113,12 +115,17 @@ impl BackgroundJob for SyncToSparseIndex {
113115
let future = env.storage.sync_index(&self.krate, content);
114116
future.await.context("Failed to sync index data")?;
115117

116-
if let Some(cloudfront) = env.cloudfront() {
118+
if env.cloudfront().is_some() {
117119
let path = Repository::relative_index_file_for_url(&self.krate);
118120

119-
info!(%path, "Invalidating index file on CloudFront");
120-
let future = cloudfront.invalidate(&path);
121-
future.await.context("Failed to invalidate CloudFront")?;
121+
info!(%path, "Queuing index file invalidation on CloudFront");
122+
123+
let paths = &[path];
124+
let result = CloudFrontInvalidationQueueItem::queue_paths(&mut conn, paths).await;
125+
result.context("Failed to queue CloudFront invalidation path")?;
126+
127+
let result = ProcessCloudfrontInvalidationQueue.enqueue(&mut conn).await;
128+
result.context("Failed to enqueue CloudFront invalidation processing job")?;
122129
}
123130
Ok(())
124131
}

src/worker/jobs/index_version_downloads_archive/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,11 @@ impl BackgroundJob for IndexVersionDownloadsArchive {
5454
info!("index.json generated and uploaded");
5555

5656
info!("Invalidating CDN caches…");
57-
if let Err(error) = env.invalidate_cdns(INDEX_PATH).await {
57+
let mut conn = env.deadpool.get().await?;
58+
if let Err(error) = env.invalidate_cdns(&mut conn, INDEX_PATH).await {
5859
warn!("Failed to invalidate CDN caches: {error}");
5960
}
60-
if let Err(error) = env.invalidate_cdns(INDEX_JSON_PATH).await {
61+
if let Err(error) = env.invalidate_cdns(&mut conn, INDEX_JSON_PATH).await {
6162
warn!("Failed to invalidate CDN caches: {error}");
6263
}
6364
info!("CDN caches invalidated");

src/worker/jobs/invalidate_cdns.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
use std::sync::Arc;
22

33
use anyhow::Context;
4+
use crates_io_database::models::CloudFrontInvalidationQueueItem;
45
use crates_io_worker::BackgroundJob;
56
use serde::{Deserialize, Serialize};
67

78
use crate::worker::Environment;
9+
use crate::worker::jobs::ProcessCloudfrontInvalidationQueue;
810

911
/// A background job that invalidates the given paths on all CDNs in use on crates.io.
1012
#[derive(Deserialize, Serialize)]
@@ -46,11 +48,16 @@ impl BackgroundJob for InvalidateCdns {
4648
}
4749
}
4850

49-
if let Some(cloudfront) = ctx.cloudfront() {
50-
cloudfront
51-
.invalidate_many(self.paths.clone())
52-
.await
53-
.context("Failed to invalidate paths on CloudFront CDN")?;
51+
// Queue CloudFront invalidations for batch processing instead of calling directly
52+
if ctx.cloudfront().is_some() {
53+
let mut conn = ctx.deadpool.get().await?;
54+
55+
let result = CloudFrontInvalidationQueueItem::queue_paths(&mut conn, &self.paths).await;
56+
result.context("Failed to queue CloudFront invalidation paths")?;
57+
58+
// Schedule the processing job to handle the queued paths
59+
let result = ProcessCloudfrontInvalidationQueue.enqueue(&mut conn).await;
60+
result.context("Failed to enqueue CloudFront invalidation processing job")?;
5461
}
5562

5663
Ok(())

src/worker/jobs/rss/sync_crate_feed.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ impl BackgroundJob for SyncCrateFeed {
8080
ctx.storage.upload_feed(&feed_id, &channel).await?;
8181

8282
let path = object_store::path::Path::from(&feed_id);
83-
if let Err(error) = ctx.invalidate_cdns(path.as_ref()).await {
83+
if let Err(error) = ctx.invalidate_cdns(&mut conn, path.as_ref()).await {
8484
warn!("Failed to invalidate CDN caches: {error}");
8585
}
8686

src/worker/jobs/rss/sync_crates_feed.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ impl BackgroundJob for SyncCratesFeed {
6767
ctx.storage.upload_feed(&feed_id, &channel).await?;
6868

6969
let path = object_store::path::Path::from(&feed_id);
70-
if let Err(error) = ctx.invalidate_cdns(path.as_ref()).await {
70+
if let Err(error) = ctx.invalidate_cdns(&mut conn, path.as_ref()).await {
7171
warn!("Failed to invalidate CDN caches: {error}");
7272
}
7373

src/worker/jobs/rss/sync_updates_feed.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ impl BackgroundJob for SyncUpdatesFeed {
6767
ctx.storage.upload_feed(&feed_id, &channel).await?;
6868

6969
let path = object_store::path::Path::from(&feed_id);
70-
if let Err(error) = ctx.invalidate_cdns(path.as_ref()).await {
70+
if let Err(error) = ctx.invalidate_cdns(&mut conn, path.as_ref()).await {
7171
warn!("Failed to invalidate CDN caches: {error}");
7272
}
7373

0 commit comments

Comments
 (0)