Skip to content

Commit 75575bc

Browse files
committed
fix(queue): handle errors while processing changes gracefully
chore(tracing): update tracing macro usages to use structured fields instead of string interpolation
1 parent 106886e commit 75575bc

File tree

1 file changed

+96
-23
lines changed

1 file changed

+96
-23
lines changed

src/build_queue.rs

Lines changed: 96 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -372,15 +372,12 @@ impl AsyncBuildQueue {
372372
let (changes, new_reference) = index.peek_changes_ordered().await?;
373373

374374
let mut conn = self.db.get_async().await?;
375-
let mut crates_added = 0;
376375

377-
debug!("queueing changes from {last_seen_reference} to {new_reference}");
376+
debug!(last_seen_reference=%last_seen_reference, new_reference=%new_reference, "queueing changes");
378377

379-
for change in &changes {
380-
if self.process_change(index, &mut conn, change).await? {
381-
crates_added += 1;
382-
}
383-
}
378+
let crates_added = self
379+
.process_changes(&mut conn, &changes, index.repository_url())
380+
.await;
384381

385382
// set the reference in the database
386383
// so this survives recreating the registry watcher
@@ -390,21 +387,38 @@ impl AsyncBuildQueue {
390387
Ok(crates_added)
391388
}
392389

390+
async fn process_changes(
391+
&self,
392+
conn: &mut AsyncPoolClient,
393+
changes: &Vec<Change>,
394+
registry: Option<&str>,
395+
) -> usize {
396+
let mut crates_added = 0;
397+
398+
for change in changes {
399+
match self.process_change(conn, change, registry).await {
400+
Ok(added) => {
401+
if added {
402+
crates_added += 1;
403+
}
404+
}
405+
Err(err) => report_error(&err),
406+
}
407+
}
408+
crates_added
409+
}
410+
393411
/// Process a crate change, returning whether the change was a crate addition or not.
394412
async fn process_change(
395413
&self,
396-
index: &Index,
397414
conn: &mut AsyncPoolClient,
398415
change: &Change,
416+
registry: Option<&str>,
399417
) -> Result<bool> {
400418
match change {
401-
Change::Added(release) => {
402-
self.process_version_added(conn, release, index.repository_url())
403-
.await?
404-
}
419+
Change::Added(release) => self.process_version_added(conn, release, registry).await?,
405420
Change::AddedAndYanked(release) => {
406-
self.process_version_added(conn, release, index.repository_url())
407-
.await?;
421+
self.process_version_added(conn, release, registry).await?;
408422
self.process_version_yank_status(conn, release).await?;
409423
}
410424
Change::Unyanked(release) | Change::Yanked(release) => {
@@ -460,8 +474,9 @@ impl AsyncBuildQueue {
460474
)
461475
})?;
462476
debug!(
463-
"{}-{} added into build queue",
464-
release.name, release.version
477+
name=%release.name,
478+
version=%release.version,
479+
"added into build queue",
465480
);
466481
self.queue_metrics.queued_builds.add(1, &[]);
467482
self.deprioritize_other_releases(&release.name, version, PRIORITY_MANUAL_FROM_CRATES_IO)
@@ -489,8 +504,9 @@ impl AsyncBuildQueue {
489504
)
490505
})?;
491506
info!(
492-
"release {}-{} was deleted from the index and the database",
493-
release.name, release.version
507+
name=%release.name,
508+
version=%release.version,
509+
"release was deleted from the index and the database",
494510
);
495511
self.queue_crate_invalidation(&release.name).await;
496512
self.remove_version_from_queue(&release.name, &version)
@@ -503,8 +519,8 @@ impl AsyncBuildQueue {
503519
.await
504520
.with_context(|| format!("failed to delete crate {krate}"))?;
505521
info!(
506-
"crate {} was deleted from the index and the database",
507-
krate
522+
name=%krate,
523+
"crate deleted from the index and the database",
508524
);
509525
self.queue_crate_invalidation(krate).await;
510526
self.remove_crate_from_queue(krate).await
@@ -542,7 +558,12 @@ impl AsyncBuildQueue {
542558
.fetch_optional(&mut *conn)
543559
.await?
544560
{
545-
debug!("{}-{} {}", name, version, activity);
561+
debug!(
562+
%name,
563+
%version,
564+
%activity,
565+
"updating latest version id"
566+
);
546567
update_latest_version_id(&mut *conn, crate_id).await?;
547568
} else {
548569
match self
@@ -552,8 +573,9 @@ impl AsyncBuildQueue {
552573
{
553574
Ok(false) => {
554575
error!(
555-
"tried to yank or unyank non-existing release: {} {}",
556-
name, version
576+
%name,
577+
%version,
578+
"tried to yank or unyank non-existing release",
557579
);
558580
}
559581
Ok(true) => {
@@ -1095,6 +1117,57 @@ mod tests {
10951117
Ok(())
10961118
}
10971119

1120+
/// Ensure changes can be processed with graceful error handling and proper tracking of added versions
1121+
#[tokio::test(flavor = "multi_thread")]
1122+
async fn test_process_changes() -> Result<()> {
1123+
let env = TestEnvironment::new().await?;
1124+
let build_queue = env.async_build_queue();
1125+
let mut conn = env.async_db().async_conn().await;
1126+
1127+
env.fake_release()
1128+
.await
1129+
.name("krate_already_present")
1130+
.version(V1)
1131+
.create()
1132+
.await?;
1133+
1134+
let krate1 = CrateVersion {
1135+
name: "krate1".parse()?,
1136+
version: V1.to_string().parse()?,
1137+
..Default::default()
1138+
};
1139+
let krate2 = CrateVersion {
1140+
name: "krate2".parse()?,
1141+
version: V1.to_string().parse()?,
1142+
..Default::default()
1143+
};
1144+
let krate_already_present = CrateVersion {
1145+
name: "krate_already_present".parse()?,
1146+
version: V1.to_string().parse()?,
1147+
..Default::default()
1148+
};
1149+
let non_existing_krate = CrateVersion {
1150+
name: "krate_already_present".parse()?,
1151+
version: V2.to_string().parse()?,
1152+
..Default::default()
1153+
};
1154+
let added = build_queue
1155+
.process_changes(
1156+
&mut conn,
1157+
&vec![
1158+
Change::Added(krate1), // Should be added correctly
1159+
Change::Added(krate2), // Should be added correctly
1160+
Change::VersionDeleted(krate_already_present), // Should be deleted correctly, without affecting the returned counter
1161+
Change::VersionDeleted(non_existing_krate), // Should error out, but the error should be handled gracefully
1162+
],
1163+
None,
1164+
)
1165+
.await;
1166+
1167+
assert_eq!(added, 2);
1168+
Ok(())
1169+
}
1170+
10981171
#[tokio::test(flavor = "multi_thread")]
10991172
async fn test_rebuild_when_old() -> Result<()> {
11001173
let env = TestEnvironment::with_config(

0 commit comments

Comments
 (0)