Skip to content

Commit 97bedb9

Browse files
authored
fix: reduce db read ops in chain export (#5868)
1 parent 33db8e8 commit 97bedb9

File tree

2 files changed

+69
-47
lines changed

2 files changed

+69
-47
lines changed

src/blocks/header.rs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,18 @@ use std::sync::{
88
};
99

1010
use super::{ElectionProof, Error, Ticket, TipsetKey};
11-
use crate::beacon::{BeaconEntry, BeaconSchedule};
12-
use crate::shim::clock::ChainEpoch;
13-
use crate::shim::{
14-
address::Address, crypto::Signature, econ::TokenAmount, sector::PoStProof,
15-
version::NetworkVersion,
11+
use crate::{
12+
beacon::{BeaconEntry, BeaconSchedule},
13+
shim::{
14+
address::Address, clock::ChainEpoch, crypto::Signature, econ::TokenAmount,
15+
sector::PoStProof, version::NetworkVersion,
16+
},
17+
utils::{encoding::blake2b_256, multihash::MultihashCode},
1618
};
17-
use crate::utils::{cid::CidCborExt as _, encoding::blake2b_256};
1819
use cid::Cid;
1920
use fvm_ipld_blockstore::Blockstore;
2021
use fvm_ipld_encoding::CborStore as _;
22+
use multihash_derive::MultihashDigest as _;
2123
use num::BigInt;
2224
use serde::{Deserialize, Serialize};
2325
use serde_tuple::{Deserialize_tuple, Serialize_tuple};
@@ -96,7 +98,15 @@ impl Default for RawBlockHeader {
9698

9799
impl RawBlockHeader {
98100
pub fn cid(&self) -> Cid {
99-
Cid::from_cbor_blake2b256(self).unwrap()
101+
self.car_block().expect("CBOR serialization failed").0
102+
}
103+
pub fn car_block(&self) -> anyhow::Result<(Cid, Vec<u8>)> {
104+
let data = fvm_ipld_encoding::to_vec(self)?;
105+
let cid = Cid::new_v1(
106+
fvm_ipld_encoding::DAG_CBOR,
107+
MultihashCode::Blake2b256.digest(&data),
108+
);
109+
Ok((cid, data))
100110
}
101111
pub(super) fn tipset_sort_key(&self) -> Option<([u8; 32], Vec<u8>)> {
102112
let ticket_hash = blake2b_256(self.ticket.as_ref()?.vrfproof.as_bytes());

src/ipld/util.rs

Lines changed: 52 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ impl Iterator for DfsIter {
102102

103103
enum Task {
104104
// Yield the block, don't visit it.
105-
Emit(Cid),
105+
Emit(Cid, Option<Vec<u8>>),
106106
// Visit all the elements, recursively.
107107
Iterate(VecDeque<Cid>),
108108
}
@@ -179,26 +179,25 @@ impl<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin> Stream
179179

180180
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
181181
use Task::*;
182-
let this = self.project();
183182

184-
let ipld_to_cid = |ipld| {
185-
if let Ipld::Link(cid) = ipld {
186-
return Some(cid);
187-
}
188-
None
189-
};
183+
let fail_on_dead_links = self.fail_on_dead_links;
184+
let stateroot_limit = self.stateroot_limit;
185+
let this = self.project();
190186

191-
let stateroot_limit = *this.stateroot_limit;
192187
loop {
193188
while let Some(task) = this.dfs.front_mut() {
194189
match task {
195-
Emit(cid) => {
196-
let cid = *cid;
197-
this.dfs.pop_front();
198-
if let Some(data) = this.db.get(&cid)? {
199-
return Poll::Ready(Some(Ok(CarBlock { cid, data })));
200-
} else if *this.fail_on_dead_links {
201-
return Poll::Ready(Some(Err(anyhow::anyhow!("missing key: {}", cid))));
190+
Emit(_, _) => {
191+
if let Some(Emit(cid, data)) = this.dfs.pop_front() {
192+
if let Some(data) = data {
193+
return Poll::Ready(Some(Ok(CarBlock { cid, data })));
194+
} else if let Some(data) = this.db.get(&cid)? {
195+
return Poll::Ready(Some(Ok(CarBlock { cid, data })));
196+
} else if fail_on_dead_links {
197+
return Poll::Ready(Some(Err(anyhow::anyhow!(
198+
"missing key: {cid}"
199+
))));
200+
};
202201
}
203202
}
204203
Iterate(cid_vec) => {
@@ -212,17 +211,17 @@ impl<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin> Stream
212211
if let Some(data) = this.db.get(&cid)? {
213212
if cid.codec() == fvm_ipld_encoding::DAG_CBOR {
214213
let new_values = extract_cids(&data)?;
215-
cid_vec.reserve(new_values.len());
216-
217-
for v in new_values.into_iter().rev() {
218-
cid_vec.push_front(v)
214+
if !new_values.is_empty() {
215+
cid_vec.reserve(new_values.len());
216+
for v in new_values.into_iter().rev() {
217+
cid_vec.push_front(v)
218+
}
219219
}
220220
}
221221
return Poll::Ready(Some(Ok(CarBlock { cid, data })));
222-
} else if *this.fail_on_dead_links {
222+
} else if fail_on_dead_links {
223223
return Poll::Ready(Some(Err(anyhow::anyhow!(
224-
"missing key: {}",
225-
cid
224+
"missing key: {cid}"
226225
))));
227226
}
228227
}
@@ -237,14 +236,15 @@ impl<DB: Blockstore, T: Borrow<Tipset>, ITER: Iterator<Item = T> + Unpin> Stream
237236
// yield the block without walking the graph it represents.
238237
if let Some(tipset) = this.tipset_iter.next() {
239238
for block in tipset.borrow().block_headers() {
240-
if this.seen.insert(*block.cid()) {
239+
let (cid, data) = block.car_block()?;
240+
if this.seen.insert(cid) {
241241
// Make sure we always yield a block otherwise.
242-
this.dfs.push_back(Emit(*block.cid()));
242+
this.dfs.push_back(Emit(cid, Some(data)));
243243

244244
if block.epoch == 0 {
245245
// The genesis block has some kind of dummy parent that needs to be emitted.
246246
for p in &block.parents {
247-
this.dfs.push_back(Emit(p));
247+
this.dfs.push_back(Emit(p, None));
248248
}
249249
}
250250

@@ -287,7 +287,7 @@ pin_project! {
287287
block_receiver: flume::Receiver<anyhow::Result<CarBlock>>,
288288
extract_sender: flume::Sender<Cid>,
289289
stateroot_limit: ChainEpoch,
290-
queue: Vec<Cid>,
290+
queue: Vec<(Cid,Option<Vec<u8>>)>,
291291
fail_on_dead_links: bool,
292292
}
293293

@@ -425,7 +425,7 @@ impl<DB: Blockstore + Send + Sync + 'static, T: Borrow<Tipset>, ITER: Iterator<I
425425
// If the receiving end has already quit - just ignore it and
426426
// break out of the loop.
427427
let _ = block_sender
428-
.send(Err(anyhow::anyhow!("missing key: {}", cid)));
428+
.send(Err(anyhow::anyhow!("missing key: {cid}")));
429429
break 'main;
430430
}
431431
}
@@ -454,6 +454,8 @@ impl<DB: Blockstore + Send + Sync + 'static, T: Iterator<Item = Tipset> + Unpin>
454454
type Item = anyhow::Result<CarBlock>;
455455

456456
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
457+
let stateroot_limit = self.stateroot_limit;
458+
let fail_on_dead_links = self.fail_on_dead_links;
457459
let this = self.project();
458460
let receive_block = || {
459461
if let Ok(item) = this.block_receiver.try_recv() {
@@ -462,33 +464,35 @@ impl<DB: Blockstore + Send + Sync + 'static, T: Iterator<Item = Tipset> + Unpin>
462464
None
463465
};
464466
loop {
465-
while let Some(cid) = this.queue.pop() {
466-
if let Some(data) = this.db.get(&cid)? {
467+
while let Some((cid, data)) = this.queue.pop() {
468+
if let Some(data) = data {
469+
return Poll::Ready(Some(Ok(CarBlock { cid, data })));
470+
} else if let Some(data) = this.db.get(&cid)? {
467471
return Poll::Ready(Some(Ok(CarBlock { cid, data })));
468-
} else if *this.fail_on_dead_links {
469-
return Poll::Ready(Some(Err(anyhow::anyhow!("missing key: {}", cid))));
472+
} else if fail_on_dead_links {
473+
return Poll::Ready(Some(Err(anyhow::anyhow!("missing key: {cid}"))));
470474
}
471475
}
472476

473477
if let Some(block) = receive_block() {
474478
return Poll::Ready(Some(block));
475479
}
476480

477-
let stateroot_limit = *this.stateroot_limit;
478481
// This consumes a [`Tipset`] from the iterator one at a time. Workers are then processing
479482
// the extract queue. The emit queue is processed in the loop above. Once the desired depth
480483
// has been reached yield a block without walking the graph it represents.
481484
if let Some(tipset) = this.tipset_iter.next() {
482485
for block in tipset.into_block_headers().into_iter() {
483-
if this.seen.lock().insert(*block.cid()) {
486+
let (cid, data) = block.car_block()?;
487+
if this.seen.lock().insert(cid) {
484488
// Make sure we always yield a block, directly to the stream to avoid extra
485489
// work.
486-
this.queue.push(*block.cid());
490+
this.queue.push((cid, Some(data)));
487491

488492
if block.epoch == 0 {
489493
// The genesis block has some kind of dummy parent that needs to be emitted.
490494
for p in &block.parents {
491-
this.queue.push(p);
495+
this.queue.push((p, None));
492496
}
493497
}
494498

@@ -500,8 +504,8 @@ impl<DB: Blockstore + Send + Sync + 'static, T: Iterator<Item = Tipset> + Unpin>
500504
this.extract_sender.send(block.messages)?;
501505
// This will simply return an error once we reach that item in
502506
// the queue.
503-
} else if *this.fail_on_dead_links {
504-
this.queue.push(block.messages);
507+
} else if fail_on_dead_links {
508+
this.queue.push((block.messages, None));
505509
} else {
506510
// Make sure we update seen here as we don't send the block for
507511
// inspection.
@@ -518,8 +522,8 @@ impl<DB: Blockstore + Send + Sync + 'static, T: Iterator<Item = Tipset> + Unpin>
518522
this.extract_sender.send(block.state_root)?;
519523
// This will simply return an error once we reach that item in
520524
// the queue.
521-
} else if *this.fail_on_dead_links {
522-
this.queue.push(block.state_root);
525+
} else if fail_on_dead_links {
526+
this.queue.push((block.state_root, None));
523527
} else {
524528
// Make sure we update seen here as we don't send the block for
525529
// inspection.
@@ -550,3 +554,11 @@ impl<DB: Blockstore + Send + Sync + 'static, T: Iterator<Item = Tipset> + Unpin>
550554
}
551555
}
552556
}
557+
558+
fn ipld_to_cid(ipld: Ipld) -> Option<Cid> {
559+
if let Ipld::Link(cid) = ipld {
560+
Some(cid)
561+
} else {
562+
None
563+
}
564+
}

0 commit comments

Comments
 (0)