Skip to content

Commit c0ec5b7

Browse files
authored
chore: refactor and simplify stream_graph and unordered_stream_graph (#5893)
1 parent baf74e6 commit c0ec5b7

File tree

1 file changed

+36
-47
lines changed

1 file changed

+36
-47
lines changed

src/ipld/util.rs

Lines changed: 36 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,14 @@ pin_project! {
119119
}
120120

121121
impl<DB, T> ChainStream<DB, T> {
122-
pub fn with_seen(self, seen: CidHashSet) -> Self {
123-
ChainStream { seen, ..self }
122+
pub fn with_seen(mut self, seen: CidHashSet) -> Self {
123+
self.seen = seen;
124+
self
125+
}
126+
127+
pub fn fail_on_dead_links(mut self, fail_on_dead_links: bool) -> Self {
128+
self.fail_on_dead_links = fail_on_dead_links;
129+
self
124130
}
125131

126132
#[allow(dead_code)]
@@ -162,14 +168,7 @@ pub fn stream_graph<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T>
162168
tipset_iter: ITER,
163169
stateroot_limit: ChainEpoch,
164170
) -> ChainStream<DB, ITER> {
165-
ChainStream {
166-
tipset_iter,
167-
db,
168-
dfs: VecDeque::new(),
169-
seen: CidHashSet::default(),
170-
stateroot_limit,
171-
fail_on_dead_links: false,
172-
}
171+
stream_chain(db, tipset_iter, stateroot_limit).fail_on_dead_links(false)
173172
}
174173

175174
impl<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin> Stream
@@ -287,7 +286,7 @@ pin_project! {
287286
block_receiver: flume::Receiver<anyhow::Result<CarBlock>>,
288287
extract_sender: flume::Sender<Cid>,
289288
stateroot_limit: ChainEpoch,
290-
queue: Vec<(Cid,Option<Vec<u8>>)>,
289+
queue: Vec<(Cid, Option<Vec<u8>>)>,
291290
fail_on_dead_links: bool,
292291
}
293292

@@ -308,29 +307,18 @@ impl<DB, T> UnorderedChainStream<DB, T> {
308307
}
309308
}
310309

311-
/// Stream all blocks that are reachable before the `stateroot_limit` epoch in an unordered fashion.
312-
/// After this limit, only block headers are streamed. Any dead links are reported as errors.
313-
///
314-
/// # Arguments
315-
///
316-
/// * `db` - A database that implements [`Blockstore`] interface.
317-
/// * `tipset_iter` - An iterator of [`Tipset`], descending order `$child -> $parent`.
318-
/// * `stateroot_limit` - An epoch that signifies how far back we need to inspect tipsets, in-depth.
319-
/// This has to be pre-calculated using this formula: `$cur_epoch - $depth`, where `$depth` is the
320-
/// number of `[`Tipset`]` that needs inspection.
321-
#[allow(dead_code)]
322-
pub fn unordered_stream_chain<
310+
fn unordered_stream_chain_inner<
323311
DB: Blockstore + Sync + Send + 'static,
324312
T: Borrow<Tipset>,
325313
ITER: Iterator<Item = T> + Unpin + Send + 'static,
326314
>(
327315
db: Arc<DB>,
328316
tipset_iter: ITER,
329317
stateroot_limit: ChainEpoch,
318+
fail_on_dead_links: bool,
330319
) -> UnorderedChainStream<DB, ITER> {
331320
let (sender, receiver) = flume::bounded(BLOCK_CHANNEL_LIMIT);
332321
let (extract_sender, extract_receiver) = flume::unbounded();
333-
let fail_on_dead_links = true;
334322
let seen = Arc::new(Mutex::new(CidHashSet::default()));
335323
let handle = UnorderedChainStream::<DB, ITER>::start_workers(
336324
db.clone(),
@@ -353,6 +341,29 @@ pub fn unordered_stream_chain<
353341
}
354342
}
355343

344+
/// Stream all blocks that are reachable before the `stateroot_limit` epoch in an unordered fashion.
345+
/// After this limit, only block headers are streamed. Any dead links are reported as errors.
346+
///
347+
/// # Arguments
348+
///
349+
/// * `db` - A database that implements [`Blockstore`] interface.
350+
/// * `tipset_iter` - An iterator of [`Tipset`], descending order `$child -> $parent`.
351+
/// * `stateroot_limit` - An epoch that signifies how far back we need to inspect tipsets, in-depth.
352+
/// This has to be pre-calculated using this formula: `$cur_epoch - $depth`, where `$depth` is the
353+
/// number of `[`Tipset`]` that needs inspection.
354+
#[allow(dead_code)]
355+
pub fn unordered_stream_chain<
356+
DB: Blockstore + Sync + Send + 'static,
357+
T: Borrow<Tipset>,
358+
ITER: Iterator<Item = T> + Unpin + Send + 'static,
359+
>(
360+
db: Arc<DB>,
361+
tipset_iter: ITER,
362+
stateroot_limit: ChainEpoch,
363+
) -> UnorderedChainStream<DB, ITER> {
364+
unordered_stream_chain_inner(db, tipset_iter, stateroot_limit, true)
365+
}
366+
356367
// Stream available graph in unordered search. All reachable nodes are touched and dead-links
357368
// are ignored.
358369
pub fn unordered_stream_graph<
@@ -364,29 +375,7 @@ pub fn unordered_stream_graph<
364375
tipset_iter: ITER,
365376
stateroot_limit: ChainEpoch,
366377
) -> UnorderedChainStream<DB, ITER> {
367-
let (sender, receiver) = flume::bounded(2048);
368-
let (extract_sender, extract_receiver) = flume::unbounded();
369-
let fail_on_dead_links = false;
370-
let seen = Arc::new(Mutex::new(CidHashSet::default()));
371-
let handle = UnorderedChainStream::<DB, ITER>::start_workers(
372-
db.clone(),
373-
sender.clone(),
374-
extract_receiver,
375-
seen.clone(),
376-
fail_on_dead_links,
377-
);
378-
379-
UnorderedChainStream {
380-
seen,
381-
db,
382-
worker_handle: handle,
383-
block_receiver: receiver,
384-
queue: Vec::new(),
385-
tipset_iter,
386-
extract_sender,
387-
stateroot_limit,
388-
fail_on_dead_links,
389-
}
378+
unordered_stream_chain_inner(db, tipset_iter, stateroot_limit, false)
390379
}
391380

392381
impl<DB: Blockstore + Send + Sync + 'static, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin>

0 commit comments

Comments
 (0)