Skip to content

Commit 92eba0b

Browse files
committed
worker: Implement ProcessCloudfrontInvalidationQueue background job
1 parent 493d9b7 commit 92eba0b

File tree

3 files changed

+125
-0
lines changed

3 files changed

+125
-0
lines changed

src/worker/jobs/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ mod generate_og_image;
99
mod index;
1010
mod index_version_downloads_archive;
1111
mod invalidate_cdns;
12+
mod process_cloudfront_invalidation_queue;
1213
mod readmes;
1314
pub mod rss;
1415
mod send_publish_notifications;
@@ -30,6 +31,7 @@ pub use self::generate_og_image::GenerateOgImage;
3031
pub use self::index::{NormalizeIndex, SquashIndex, SyncToGitIndex, SyncToSparseIndex};
3132
pub use self::index_version_downloads_archive::IndexVersionDownloadsArchive;
3233
pub use self::invalidate_cdns::InvalidateCdns;
34+
pub use self::process_cloudfront_invalidation_queue::ProcessCloudfrontInvalidationQueue;
3335
pub use self::readmes::RenderAndUploadReadme;
3436
pub use self::send_publish_notifications::SendPublishNotificationsJob;
3537
pub use self::sync_admins::SyncAdmins;
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
use crate::cloudfront::{CloudFront, CloudFrontError};
2+
use crate::worker::Environment;
3+
use anyhow::Context;
4+
use crates_io_database::models::CloudFrontInvalidationQueueItem;
5+
use crates_io_worker::BackgroundJob;
6+
use diesel_async::AsyncPgConnection;
7+
use serde::{Deserialize, Serialize};
8+
use std::collections::HashSet;
9+
use std::sync::Arc;
10+
use tokio::time::Duration;
11+
use tracing::{info, instrument, warn};
12+
13+
/// Maximum number of paths to process in a single batch.
14+
/// Conservative limit to stay within AWS CloudFront's 3,000 path limit per invalidation.
15+
const BATCH_SIZE: usize = 1000;
16+
17+
const INITIAL_BACKOFF: Duration = Duration::from_secs(30);
18+
const MAX_BACKOFF: Duration = Duration::from_secs(15 * 60);
19+
const MAX_RETRIES: u32 = 6; // 30s, 1m, 2m, 4m, 8m, 15m
20+
21+
/// Background job that processes CloudFront invalidation paths from the database queue in batches.
22+
///
23+
/// This job:
24+
/// - Processes up to 1,000 paths per batch to stay within AWS limits
25+
/// - Deduplicates paths before sending to CloudFront
26+
/// - Implements exponential backoff for `TooManyInvalidationsInProgress` errors
27+
/// - Processes all available batches in a single job run
28+
#[derive(Deserialize, Serialize)]
29+
pub struct ProcessCloudfrontInvalidationQueue;
30+
31+
impl ProcessCloudfrontInvalidationQueue {
32+
#[instrument(skip_all)]
33+
async fn process_batch(
34+
&self,
35+
conn: &mut AsyncPgConnection,
36+
cloudfront: &CloudFront,
37+
) -> anyhow::Result<usize> {
38+
let items = CloudFrontInvalidationQueueItem::fetch_batch(conn, BATCH_SIZE as i64).await?;
39+
if items.is_empty() {
40+
info!("No more CloudFront invalidations to process");
41+
return Ok(0);
42+
}
43+
44+
let item_count = items.len();
45+
info!("Processing next {item_count} CloudFront invalidations…");
46+
47+
let mut unique_paths = HashSet::with_capacity(item_count);
48+
let mut item_ids = Vec::with_capacity(item_count);
49+
for item in items {
50+
unique_paths.insert(item.path);
51+
item_ids.push(item.id);
52+
}
53+
let unique_paths: Vec<String> = unique_paths.into_iter().collect();
54+
55+
let result = self.invalidate_with_backoff(cloudfront, unique_paths).await;
56+
result.context("Failed to request CloudFront invalidations")?;
57+
58+
let result = CloudFrontInvalidationQueueItem::remove_items(conn, &item_ids).await;
59+
result.context("Failed to remove CloudFront invalidations from the queue")?;
60+
61+
info!("Successfully processed {item_count} CloudFront invalidations");
62+
63+
Ok(item_count)
64+
}
65+
66+
/// Invalidate paths on CloudFront with exponential backoff for `TooManyInvalidationsInProgress`
67+
#[instrument(skip_all)]
68+
async fn invalidate_with_backoff(
69+
&self,
70+
cloudfront: &CloudFront,
71+
paths: Vec<String>,
72+
) -> Result<(), CloudFrontError> {
73+
let mut attempt = 1;
74+
let mut backoff = INITIAL_BACKOFF;
75+
loop {
76+
let Err(error) = cloudfront.invalidate_many(paths.clone()).await else {
77+
return Ok(());
78+
};
79+
80+
if !error.is_too_many_invalidations_error() || attempt >= MAX_RETRIES {
81+
return Err(error);
82+
}
83+
84+
warn!(
85+
"Too many CloudFront invalidations in progress, retrying in {backoff:?} seconds…",
86+
);
87+
88+
tokio::time::sleep(backoff).await;
89+
90+
attempt += 1;
91+
backoff = std::cmp::min(backoff * 2, MAX_BACKOFF);
92+
}
93+
}
94+
}
95+
96+
impl BackgroundJob for ProcessCloudfrontInvalidationQueue {
97+
const JOB_NAME: &'static str = "process_cloudfront_invalidation_queue";
98+
const DEDUPLICATED: bool = true;
99+
100+
type Context = Arc<Environment>;
101+
102+
#[instrument(skip_all)]
103+
async fn run(&self, ctx: Self::Context) -> anyhow::Result<()> {
104+
let Some(cloudfront) = ctx.cloudfront() else {
105+
warn!("CloudFront not configured, skipping queue processing");
106+
return Ok(());
107+
};
108+
109+
let mut conn = ctx.deadpool.get().await?;
110+
111+
// Process batches until the queue is empty, or we hit an error
112+
loop {
113+
let item_count = self.process_batch(&mut conn, cloudfront).await?;
114+
if item_count == 0 {
115+
// Queue is empty, we're done
116+
break;
117+
}
118+
}
119+
120+
Ok(())
121+
}
122+
}

src/worker/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ impl RunnerExt for Runner<Arc<Environment>> {
3232
.register_job_type::<jobs::NormalizeIndex>()
3333
.register_job_type::<jobs::ProcessCdnLog>()
3434
.register_job_type::<jobs::ProcessCdnLogQueue>()
35+
.register_job_type::<jobs::ProcessCloudfrontInvalidationQueue>()
3536
.register_job_type::<jobs::RenderAndUploadReadme>()
3637
.register_job_type::<jobs::SquashIndex>()
3738
.register_job_type::<jobs::SyncAdmins>()

0 commit comments

Comments
 (0)