Skip to content

Commit f076d6d

Browse files
authored
feat: add back gc protect callback (#94)
## Description This adds back a way to externally protect blobs from garbage collection. It works like it did in blobs1: Users can provide an (async) callback to which a `&mut HashSet<Hash>` is passed. The callback is invoked before each gc run, and all hashes added to the set will be protected from gc during this run. Used in n0-computer/iroh-docs#47 ## Breaking Changes <!-- Optional, if there are any breaking changes document them, including how to migrate older code. --> ## Notes & open questions <!-- Any notes, remarks or open questions you have to make about the PR. --> ## Change checklist - [ ] Self-review. - [ ] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [ ] Tests if relevant. - [ ] All breaking changes documented.
1 parent e188e9d commit f076d6d

File tree

2 files changed

+58
-5
lines changed

2 files changed

+58
-5
lines changed

src/store/fs/gc.rs

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
use std::collections::HashSet;
1+
use std::{collections::HashSet, pin::Pin, sync::Arc};
22

33
use bao_tree::ChunkRanges;
44
use genawaiter::sync::{Co, Gen};
55
use n0_future::{Stream, StreamExt};
6-
use tracing::{debug, error, warn};
6+
use tracing::{debug, error, info, warn};
77

88
use crate::{api::Store, Hash, HashAndFormat};
99

@@ -130,14 +130,52 @@ fn gc_sweep<'a>(
130130
})
131131
}
132132

133-
#[derive(Debug, Clone)]
133+
/// Configuration for garbage collection.
134+
#[derive(derive_more::Debug, Clone)]
134135
pub struct GcConfig {
136+
/// Interval in which to run garbage collection.
135137
pub interval: std::time::Duration,
138+
/// Optional callback to manually add protected blobs.
139+
///
140+
/// The callback is called before each garbage collection run. It gets a `&mut HashSet<Hash>`
141+
/// and returns a future that returns [`ProtectOutcome`]. All hashes that are added to the
142+
/// [`HashSet`] will be protected from garbage collection during this run.
143+
///
144+
/// In normal operation, return [`ProtectOutcome::Continue`] from the callback. If you return
145+
/// [`ProtectOutcome::Abort`], the garbage collection run will be aborted.Use this if your
146+
/// source of hashes to protect returned an error, and thus garbage collection should be skipped
147+
/// completely to not unintentionally delete blobs that should be protected.
148+
#[debug("ProtectCallback")]
149+
pub add_protected: Option<ProtectCb>,
136150
}
137151

152+
/// Returned from [`ProtectCb`].
153+
///
154+
/// See [`GcConfig::add_protected] for details.
155+
#[derive(Debug)]
156+
pub enum ProtectOutcome {
157+
/// Continue with the garbage collection run.
158+
Continue,
159+
/// Abort the garbage collection run.
160+
Abort,
161+
}
162+
163+
/// The type of the garbage collection callback.
164+
///
165+
/// See [`GcConfig::add_protected] for details.
166+
pub type ProtectCb = Arc<
167+
dyn for<'a> Fn(
168+
&'a mut HashSet<Hash>,
169+
)
170+
-> Pin<Box<dyn std::future::Future<Output = ProtectOutcome> + Send + Sync + 'a>>
171+
+ Send
172+
+ Sync
173+
+ 'static,
174+
>;
175+
138176
pub async fn gc_run_once(store: &Store, live: &mut HashSet<Hash>) -> crate::api::Result<()> {
177+
debug!(externally_protected = live.len(), "gc: start");
139178
{
140-
live.clear();
141179
store.clear_protected().await?;
142180
let mut stream = gc_mark(store, live);
143181
while let Some(ev) = stream.next().await {
@@ -155,6 +193,7 @@ pub async fn gc_run_once(store: &Store, live: &mut HashSet<Hash>) -> crate::api:
155193
}
156194
}
157195
}
196+
debug!(total_protected = live.len(), "gc: sweep");
158197
{
159198
let mut stream = gc_sweep(store, live);
160199
while let Some(ev) = stream.next().await {
@@ -172,14 +211,26 @@ pub async fn gc_run_once(store: &Store, live: &mut HashSet<Hash>) -> crate::api:
172211
}
173212
}
174213
}
214+
debug!("gc: done");
175215

176216
Ok(())
177217
}
178218

179219
pub async fn run_gc(store: Store, config: GcConfig) {
220+
debug!("gc enabled with interval {:?}", config.interval);
180221
let mut live = HashSet::new();
181222
loop {
223+
live.clear();
182224
tokio::time::sleep(config.interval).await;
225+
if let Some(ref cb) = config.add_protected {
226+
match (cb)(&mut live).await {
227+
ProtectOutcome::Continue => {}
228+
ProtectOutcome::Abort => {
229+
info!("abort gc run: protect callback indicated abort");
230+
continue;
231+
}
232+
}
233+
}
183234
if let Err(e) = gc_run_once(&store, &mut live).await {
184235
error!("error during gc run: {e}");
185236
break;
@@ -288,6 +339,7 @@ mod tests {
288339
assert!(!data_path.exists());
289340
assert!(!outboard_path.exists());
290341
}
342+
live.clear();
291343
// create a large partial file and check that the data and outboard file as well as
292344
// the sizes and bitfield files are deleted by gc
293345
{

src/store/fs/options.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ use std::{
44
time::Duration,
55
};
66

7-
use super::{gc::GcConfig, meta::raw_outboard_size, temp_name};
7+
pub use super::gc::{GcConfig, ProtectCb, ProtectOutcome};
8+
use super::{meta::raw_outboard_size, temp_name};
89
use crate::Hash;
910

1011
/// Options for directories used by the file store.

0 commit comments

Comments
 (0)