diff --git a/src/new_index/db.rs b/src/new_index/db.rs index e889aad63..e23f7a9ee 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -206,6 +206,13 @@ impl DB { self.db.put_opt(key, value, &opts).unwrap(); } + pub fn write_batch(&self, batch: rocksdb::WriteBatch) { + let mut opts = rocksdb::WriteOptions::new(); + opts.set_sync(true); + opts.disable_wal(false); + self.db.write_opt(batch, &opts).unwrap(); + } + pub fn get(&self, key: &[u8]) -> Option { self.db.get(key).unwrap().map(|v| v.to_vec()) } diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index ec7bc54d8..c6f02afaf 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -5,7 +5,6 @@ use bitcoin::merkle_tree::MerkleBlock; use crypto::digest::Digest; use crypto::sha2::Sha256; -use itertools::Itertools; use rayon::prelude::*; #[cfg(not(feature = "liquid"))] @@ -270,6 +269,149 @@ impl Indexer { Ok(result) } + /// Clean up orphaned data using the specific list of removed headers + /// This is much more efficient than scanning the entire database + fn cleanup_orphaned_data(&self, orphaned_headers: &[HeaderEntry]) -> Result<()> { + if orphaned_headers.is_empty() { + return Ok(()); + } + + let min_height = orphaned_headers.first().unwrap().height(); + let max_height = orphaned_headers.last().unwrap().height(); + + info!( + "Cleaning up orphaned data for {} blocks (heights {} to {})", + orphaned_headers.len(), + min_height, + max_height + ); + + // Build HashSet of orphaned blockhashes and heights for O(1) lookup + let orphaned_hashes: HashSet = orphaned_headers + .iter() + .map(|h| *h.hash()) + .collect(); + + let orphaned_heights: HashSet = orphaned_headers + .iter() + .map(|h| h.height()) + .collect(); + + self.cleanup_history(&orphaned_heights)?; + self.cleanup_confirmations(&orphaned_hashes)?; + self.cleanup_cache(&orphaned_heights)?; + + Ok(()) + } + + /// Clean up history entries for specific orphaned heights + fn cleanup_history(&self, orphaned_heights: &HashSet) -> Result<()> { + let _timer = self.start_timer("reorg_cleanup_history"); + let mut batch = rocksdb::WriteBatch::default(); + let mut count = 0; + + // Scan history entries (scripthash history with 'H' prefix) + for row in self.store.history_db.iter_scan(&[b'H']) { + let history_row = TxHistoryRow::from_row(row); + let height = history_row.key.confirmed_height as usize; + + if orphaned_heights.contains(&height) { + batch.delete(&history_row.into_row().key); + count += 1; + } + } + + // Also clean up asset history entries if on Elements/Liquid + #[cfg(feature = "liquid")] + for row in self.store.history_db.iter_scan(&[b'I']) { + let history_row = TxHistoryRow::from_row(row); + let height = history_row.key.confirmed_height as usize; + + if orphaned_heights.contains(&height) { + batch.delete(&history_row.into_row().key); + count += 1; + } + } + + info!("Deleted {} orphaned history entries", count); + self.store.history_db.write_batch(batch); + Ok(()) + } + + /// Clean up confirmation entries for specific orphaned blockhashes + fn cleanup_confirmations(&self, orphaned_hashes: &HashSet) -> Result<()> { + let _timer = self.start_timer("reorg_cleanup_confirmations"); + let mut batch = rocksdb::WriteBatch::default(); + let mut count = 0; + + // Scan confirmation entries (prefix 'C') + for row in self.store.txstore_db.iter_scan(&[b'C']) { + let conf_row = TxConfRow::from_row(row); + let blockhash: BlockHash = deserialize(&conf_row.key.blockhash).unwrap(); + + if orphaned_hashes.contains(&blockhash) { + batch.delete(&conf_row.into_row().key); + count += 1; + } + } + + info!("Deleted {} orphaned confirmation entries", count); + self.store.txstore_db.write_batch(batch); + Ok(()) + } + + /// Clean up cached data for specific orphaned heights + fn cleanup_cache(&self, orphaned_heights: &HashSet) -> Result<()> { + let _timer = self.start_timer("reorg_cleanup_cache"); + let mut batch = rocksdb::WriteBatch::default(); + let mut count = 0; + + // Clean up aggregated stats (prefix 'A') + for row in self.store.cache_db.iter_scan(&[b'A']) { + let key = row.key; + // AggStats keys contain height + // The key format is: b'A' + scripthash + height (big-endian u32) + if key.len() >= 37 { + let height_bytes = &key[33..37]; + let height = u32::from_be_bytes([ + height_bytes[0], + height_bytes[1], + height_bytes[2], + height_bytes[3] + ]) as usize; + + if orphaned_heights.contains(&height) { + batch.delete(&key); + count += 1; + } + } + } + + // Clean up UTXO sets (prefix 'U') + for row in self.store.cache_db.iter_scan(&[b'U']) { + let key = row.key; + // UTXO keys contain height similarly + if key.len() >= 37 { + let height_bytes = &key[33..37]; + let height = u32::from_be_bytes([ + height_bytes[0], + height_bytes[1], + height_bytes[2], + height_bytes[3] + ]) as usize; + + if orphaned_heights.contains(&height) { + batch.delete(&key); + count += 1; + } + } + } + + info!("Deleted {} orphaned cache entries", count); + self.store.cache_db.write_batch(batch); + Ok(()) + } + pub fn update(&mut self, daemon: &Daemon) -> Result { let daemon = daemon.reconnect()?; let tip = daemon.getbestblockhash()?; @@ -304,14 +446,30 @@ impl Indexer { debug!("updating synced tip to {:?}", tip); self.store.txstore_db.put_sync(b"t", &serialize(&tip)); - let mut headers = self.store.indexed_headers.write().unwrap(); - headers.apply(new_headers); - assert_eq!(tip, *headers.tip()); + // Apply headers and get any orphaned headers from reorg + let orphaned_headers = { + let mut headers = self.store.indexed_headers.write().unwrap(); + let orphaned = headers.apply(new_headers); + assert_eq!(tip, *headers.tip()); + orphaned + }; + + // Cleanup orphaned data AFTER applying headers - no race condition + // Orphaned data is now unreachable via the new chain state + if !orphaned_headers.is_empty() { + warn!( + "Blockchain reorganization detected, cleaning up {} orphaned blocks", + orphaned_headers.len() + ); + self.cleanup_orphaned_data(&orphaned_headers)?; + info!("Reorg cleanup complete"); + } if let FetchFrom::BlkFiles = self.from { self.from = FetchFrom::Bitcoind; } + let headers = self.store.indexed_headers.read().unwrap(); self.tip_metric.set(headers.len() as i64 - 1); Ok(tip) @@ -494,23 +652,43 @@ impl ChainQuery { limit: usize, ) -> Vec<(Transaction, BlockId)> { let _timer_scan = self.start_timer("history"); - let txs_conf = self + + // Acquire header lock once upfront instead of per-txid + let headers = self.store.indexed_headers.read().unwrap(); + + // Group by txid and use the confirmed_height from the row itself + let mut seen = std::collections::HashSet::new(); + let mut found_last_seen = last_seen_txid.is_none(); + + let txs_conf: Vec<(Txid, BlockId)> = self .history_iter_scan_reverse(code, hash) - .map(|row| TxHistoryRow::from_row(row).get_txid()) - // XXX: unique() requires keeping an in-memory list of all txids, can we avoid that? - .unique() - // TODO seek directly to last seen tx without reading earlier rows - .skip_while(|txid| { - // skip until we reach the last_seen_txid - last_seen_txid.map_or(false, |last_seen_txid| last_seen_txid != txid) - }) - .skip(match last_seen_txid { - Some(_) => 1, // skip the last_seen_txid itself - None => 0, + .map(|row| TxHistoryRow::from_row(row)) + .filter_map(|row| { + let txid = row.get_txid(); + // Only process each txid once + if !seen.insert(txid) { + return None; + } + + // Skip until we reach the last_seen_txid + if !found_last_seen { + if Some(&txid) == last_seen_txid { + found_last_seen = true; + } + return None; + } + + // Fast path: Use the height from the row (no DB lookup needed) + let height = row.key.confirmed_height as usize; + if let Some(header) = headers.header_by_height(height) { + return Some((txid, BlockId::from(header))); + } + + // Slow path fallback: Header not yet indexed or reorged + self.tx_confirming_block(&txid).map(|b| (txid, b)) }) - .filter_map(|txid| self.tx_confirming_block(&txid).map(|b| (txid, b))) .take(limit) - .collect::>(); + .collect(); self.lookup_txns(&txs_conf) .expect("failed looking up txs in history index") @@ -527,12 +705,36 @@ impl ChainQuery { fn _history_txids(&self, code: u8, hash: &[u8], limit: usize) -> Vec<(Txid, BlockId)> { let _timer = self.start_timer("history_txids"); - self.history_iter_scan(code, hash, 0) - .map(|row| TxHistoryRow::from_row(row).get_txid()) - .unique() - .filter_map(|txid| self.tx_confirming_block(&txid).map(|b| (txid, b))) + + // Acquire header lock once upfront instead of per-txid + let headers = self.store.indexed_headers.read().unwrap(); + + // Group by txid and use the confirmed_height from the row itself + let mut seen = std::collections::HashSet::new(); + let result: Vec<(Txid, BlockId)> = self + .history_iter_scan(code, hash, 0) + .map(|row| TxHistoryRow::from_row(row)) + .filter_map(|row| { + let txid = row.get_txid(); + // Only process each txid once + if !seen.insert(txid) { + return None; + } + + // Fast path: Use the height from the row (no DB lookup needed) + let height = row.key.confirmed_height as usize; + if let Some(header) = headers.header_by_height(height) { + return Some((txid, BlockId::from(header))); + } + + // Slow path fallback: Header not yet indexed or reorged + // Fall back to old method: lookup by txid in txstore_db + self.tx_confirming_block(&txid).map(|b| (txid, b)) + }) .take(limit) - .collect() + .collect(); + + result } // TODO: avoid duplication with stats/stats_delta? @@ -604,12 +806,23 @@ impl ChainQuery { limit: usize, ) -> Result<(UtxoMap, Option, usize)> { let _timer = self.start_timer("utxo_delta"); + + // Acquire header lock once upfront instead of per-transaction + let headers = self.store.indexed_headers.read().unwrap(); + let history_iter = self .history_iter_scan(b'H', scripthash, start_height) .map(TxHistoryRow::from_row) .filter_map(|history| { - self.tx_confirming_block(&history.get_txid()) - .map(|b| (history, b)) + // Fast path: Use the height from the history row (no DB lookup needed) + let height = history.key.confirmed_height as usize; + if let Some(header) = headers.header_by_height(height) { + return Some((history, BlockId::from(header))); + } + + // Slow path fallback: Header not yet indexed or reorged + let txid = history.get_txid(); + self.tx_confirming_block(&txid).map(|blockid| (history, blockid)) }); let mut utxos = init_utxos; diff --git a/src/util/block.rs b/src/util/block.rs index 5dac63bcf..164bce036 100644 --- a/src/util/block.rs +++ b/src/util/block.rs @@ -173,7 +173,7 @@ impl HeaderList { } #[trace] - pub fn apply(&mut self, new_headers: Vec) { + pub fn apply(&mut self, new_headers: Vec) -> Vec { // new_headers[i] -> new_headers[i - 1] (i.e. new_headers.last() is the tip) for i in 1..new_headers.len() { assert_eq!(new_headers[i - 1].height() + 1, new_headers[i].height()); @@ -193,14 +193,14 @@ impl HeaderList { assert_eq!(entry.header().prev_blockhash, expected_prev_blockhash); height } - None => return, + None => return vec![], }; debug!( "applying {} new headers from height {}", new_headers.len(), new_height ); - let _removed = self.headers.split_off(new_height); // keep [0..new_height) entries + let orphaned = self.headers.split_off(new_height); // Keep [0..new_height), return orphaned for new_header in new_headers { let height = new_header.height(); assert_eq!(height, self.headers.len()); @@ -208,6 +208,7 @@ impl HeaderList { self.headers.push(new_header); self.heights.insert(self.tip, height); } + orphaned } #[trace]