Skip to content
19 changes: 19 additions & 0 deletions src/ingester/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::ingester::parser::state_update::SequenceGap;
use thiserror::Error;

#[derive(Error, Debug, PartialEq, Eq)]
Expand All @@ -14,10 +15,28 @@ pub enum IngesterError {
EmptyBatchEvent,
#[error("Invalid event.")]
InvalidEvent,
#[error("Sequence gap detected: {} gaps found", .0.len())]
SequenceGapDetected(Vec<SequenceGap>),
}

impl From<sea_orm::error::DbErr> for IngesterError {
fn from(err: sea_orm::error::DbErr) -> Self {
IngesterError::DatabaseError(format!("DatabaseError: {}", err))
}
}

impl From<String> for IngesterError {
fn from(err: String) -> Self {
IngesterError::ParserError(err)
}
}

impl From<crate::ingester::parser::state_update::SequenceGapError> for IngesterError {
fn from(err: crate::ingester::parser::state_update::SequenceGapError) -> Self {
match err {
crate::ingester::parser::state_update::SequenceGapError::GapDetected(gaps) => {
IngesterError::SequenceGapDetected(gaps)
}
}
}
}
4 changes: 4 additions & 0 deletions src/ingester/fetchers/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub fn get_grpc_stream_with_rpc_fallback(
rpc_client.clone(),
last_indexed_slot,
max_concurrent_block_fetches,
None, // No rewind receiver for grpc fallback
))
);

Expand Down Expand Up @@ -115,6 +116,7 @@ pub fn get_grpc_stream_with_rpc_fallback(
rpc_client.clone(),
last_indexed_slot,
max_concurrent_block_fetches,
None, // No rewind receiver for grpc fallback
)));
continue;
}
Expand All @@ -132,6 +134,7 @@ pub fn get_grpc_stream_with_rpc_fallback(
rpc_client.clone(),
last_indexed_slot,
max_concurrent_block_fetches,
None, // No rewind receiver for grpc fallback
)));
continue;
}
Expand All @@ -144,6 +147,7 @@ pub fn get_grpc_stream_with_rpc_fallback(
rpc_client.clone(),
last_indexed_slot,
max_concurrent_block_fetches,
None, // No rewind receiver for grpc fallback
)));
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/ingester/fetchers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use std::sync::Arc;
use async_stream::stream;
use futures::{pin_mut, Stream, StreamExt};
use solana_client::nonblocking::rpc_client::RpcClient;
use tokio::sync::mpsc;

use super::typedefs::block_info::BlockInfo;
use super::{rewind_controller::RewindCommand, typedefs::block_info::BlockInfo};

pub mod grpc;
pub mod poller;
Expand All @@ -17,10 +18,11 @@ pub struct BlockStreamConfig {
pub geyser_url: Option<String>,
pub max_concurrent_block_fetches: usize,
pub last_indexed_slot: u64,
pub rewind_receiver: Option<mpsc::UnboundedReceiver<RewindCommand>>,
}

impl BlockStreamConfig {
pub fn load_block_stream(&self) -> impl Stream<Item = Vec<BlockInfo>> {
pub fn load_block_stream(self) -> impl Stream<Item = Vec<BlockInfo>> {
let grpc_stream = self.geyser_url.as_ref().map(|geyser_url| {
let auth_header = std::env::var("GRPC_X_TOKEN").unwrap();
get_grpc_stream_with_rpc_fallback(
Expand All @@ -37,6 +39,7 @@ impl BlockStreamConfig {
self.rpc_client.clone(),
self.last_indexed_slot,
self.max_concurrent_block_fetches,
self.rewind_receiver,
))
} else {
None
Expand Down
27 changes: 24 additions & 3 deletions src/ingester/fetchers/poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,43 @@ use futures::{pin_mut, Stream, StreamExt};
use solana_client::{
nonblocking::rpc_client::RpcClient, rpc_config::RpcBlockConfig, rpc_request::RpcError,
};
use tokio::sync::mpsc;

use solana_sdk::commitment_config::CommitmentConfig;
use solana_transaction_status::{TransactionDetails, UiTransactionEncoding};

use crate::{
ingester::typedefs::block_info::{parse_ui_confirmed_blocked, BlockInfo},
ingester::{
rewind_controller::RewindCommand,
typedefs::block_info::{parse_ui_confirmed_blocked, BlockInfo},
},
metric,
monitor::{start_latest_slot_updater, LATEST_SLOT},
};

const SKIPPED_BLOCK_ERRORS: [i64; 2] = [-32007, -32009];

fn get_slot_stream(rpc_client: Arc<RpcClient>, start_slot: u64) -> impl Stream<Item = u64> {
fn get_slot_stream(
rpc_client: Arc<RpcClient>,
start_slot: u64,
mut rewind_receiver: Option<mpsc::UnboundedReceiver<RewindCommand>>,
) -> impl Stream<Item = u64> {
stream! {
start_latest_slot_updater(rpc_client.clone()).await;
let mut next_slot_to_fetch = start_slot;
loop {
// Check for rewind commands
if let Some(ref mut receiver) = rewind_receiver {
while let Ok(command) = receiver.try_recv() {
match command {
RewindCommand::Rewind { to_slot, reason } => {
log::error!("Rewinding slot stream to {}: {}", to_slot, reason);
next_slot_to_fetch = to_slot;
}
}
}
}

if next_slot_to_fetch > LATEST_SLOT.load(Ordering::SeqCst) {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
continue;
Expand All @@ -40,13 +60,14 @@ pub fn get_block_poller_stream(
rpc_client: Arc<RpcClient>,
mut last_indexed_slot: u64,
max_concurrent_block_fetches: usize,
rewind_receiver: Option<mpsc::UnboundedReceiver<RewindCommand>>,
) -> impl Stream<Item = Vec<BlockInfo>> {
stream! {
let start_slot = match last_indexed_slot {
0 => 0,
last_indexed_slot => last_indexed_slot + 1
};
let slot_stream = get_slot_stream(rpc_client.clone(), start_slot);
let slot_stream = get_slot_stream(rpc_client.clone(), start_slot, rewind_receiver);
pin_mut!(slot_stream);
let block_stream = slot_stream
.map(|slot| {
Expand Down
12 changes: 7 additions & 5 deletions src/ingester/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub async fn index_block_stream(
rpc_client: Arc<RpcClient>,
last_indexed_slot_at_start: u64,
end_slot: Option<u64>,
rewind_controller: Option<&crate::ingester::rewind_controller::RewindController>,
) {
pin_mut!(block_stream);
let current_slot =
Expand All @@ -65,16 +66,17 @@ pub async fn index_block_stream(
"Backfilling historical blocks. Current number of blocks to backfill: {}",
number_of_blocks_to_backfill
);
let mut last_indexed_slot = last_indexed_slot_at_start;
// let mut last_indexed_slot = last_indexed_slot_at_start;

let mut finished_backfill_slot = None;

while let Some(blocks) = block_stream.next().await {
let first_slot_in_block = blocks.first().unwrap().metadata.slot;
let last_slot_in_block = blocks.last().unwrap().metadata.slot;
index_block_batch_with_infinite_retries(db.as_ref(), blocks).await;
index_block_batch_with_infinite_retries(db.as_ref(), blocks, rewind_controller).await;

for slot in (last_indexed_slot + 1)..(last_slot_in_block + 1) {
let blocks_indexed = slot - last_indexed_slot_at_start;
for slot in first_slot_in_block..(last_slot_in_block + 1) {
let blocks_indexed = slot.saturating_sub(last_indexed_slot_at_start);
if blocks_indexed < number_of_blocks_to_backfill {
if blocks_indexed % PRE_BACKFILL_FREQUENCY == 0 {
info!(
Expand All @@ -92,7 +94,7 @@ pub async fn index_block_stream(
info!("Indexed slot {}", slot);
}
}
last_indexed_slot = slot;
// last_indexed_slot = slot;
}
}
}
71 changes: 63 additions & 8 deletions src/ingester/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ use sea_orm::QueryTrait;
use sea_orm::Set;
use sea_orm::TransactionTrait;

use self::parser::state_update::StateUpdate;
use self::parser::state_update::{SequenceGapError, StateUpdate};
use self::persist::persist_state_update;
use self::persist::MAX_SQL_INSERTS;
use self::rewind_controller::{determine_rewind_slot, RewindController};
use self::typedefs::block_info::BlockInfo;
use self::typedefs::block_info::BlockMetadata;
use crate::dao::generated::blocks;
Expand All @@ -27,20 +28,43 @@ pub mod fetchers;
pub mod indexer;
pub mod parser;
pub mod persist;
pub mod rewind_controller;
pub mod typedefs;

fn derive_block_state_update(block: &BlockInfo) -> Result<StateUpdate, IngesterError> {
fn derive_block_state_update(
block: &BlockInfo,
rewind_controller: Option<&RewindController>,
) -> Result<StateUpdate, IngesterError> {
let mut state_updates: Vec<StateUpdate> = Vec::new();
for transaction in &block.transactions {
state_updates.push(parse_transaction(transaction, block.metadata.slot)?);
}
Ok(StateUpdate::merge_updates(state_updates))

match StateUpdate::merge_updates_with_slot(state_updates, Some(block.metadata.slot)) {
Ok(merged_update) => Ok(merged_update),
Err(SequenceGapError::GapDetected(gaps)) => {
if let Some(controller) = rewind_controller {
let rewind_slot = determine_rewind_slot(&gaps);
let reason = format!(
"Sequence gaps detected in block {}: {} gaps found",
block.metadata.slot,
gaps.len()
);
controller.request_rewind(rewind_slot, reason)?;
}
Err(IngesterError::SequenceGapDetected(gaps))
}
}
}

pub async fn index_block(db: &DatabaseConnection, block: &BlockInfo) -> Result<(), IngesterError> {
pub async fn index_block(
db: &DatabaseConnection,
block: &BlockInfo,
rewind_controller: Option<&RewindController>,
) -> Result<(), IngesterError> {
let txn = db.begin().await?;
index_block_metadatas(&txn, vec![&block.metadata]).await?;
persist_state_update(&txn, derive_block_state_update(block)?).await?;
persist_state_update(&txn, derive_block_state_update(block, rewind_controller)?).await?;
txn.commit().await?;
Ok(())
}
Expand Down Expand Up @@ -81,16 +105,41 @@ async fn index_block_metadatas(
pub async fn index_block_batch(
db: &DatabaseConnection,
block_batch: &Vec<BlockInfo>,
rewind_controller: Option<&RewindController>,
) -> Result<(), IngesterError> {
let blocks_len = block_batch.len();
let tx = db.begin().await?;
let block_metadatas: Vec<&BlockMetadata> = block_batch.iter().map(|b| &b.metadata).collect();
index_block_metadatas(&tx, block_metadatas).await?;
let mut state_updates = Vec::new();
for block in block_batch {
state_updates.push(derive_block_state_update(block)?);
state_updates.push(derive_block_state_update(block, rewind_controller)?);
}
persist::persist_state_update(&tx, StateUpdate::merge_updates(state_updates)).await?;

if block_batch.is_empty() {
return Ok(()); // Or return an appropriate error
}

let merged_state_update = match StateUpdate::merge_updates_with_slot(
state_updates,
Some(block_batch.last().unwrap().metadata.slot),
) {
Ok(merged) => merged,
Err(SequenceGapError::GapDetected(gaps)) => {
if let Some(controller) = rewind_controller {
let rewind_slot = determine_rewind_slot(&gaps);
let reason = format!(
"Sequence gaps detected in batch ending at slot {}: {} gaps found",
block_batch.last().unwrap().metadata.slot,
gaps.len()
);
controller.request_rewind(rewind_slot, reason)?;
}
return Err(IngesterError::SequenceGapDetected(gaps));
}
};

persist::persist_state_update(&tx, merged_state_update).await?;
metric! {
statsd_count!("blocks_indexed", blocks_len as i64);
}
Expand All @@ -101,10 +150,16 @@ pub async fn index_block_batch(
pub async fn index_block_batch_with_infinite_retries(
db: &DatabaseConnection,
block_batch: Vec<BlockInfo>,
rewind_controller: Option<&RewindController>,
) {
loop {
match index_block_batch(db, &block_batch).await {
match index_block_batch(db, &block_batch, rewind_controller).await {
Ok(()) => return,
Err(IngesterError::SequenceGapDetected(_)) => {
// For sequence gaps, we don't retry - we let the rewind mechanism handle it
log::error!("Sequence gap detected in batch, stopping processing to allow rewind");
return;
}
Err(e) => {
let start_block = block_batch.first().unwrap().metadata.slot;
let end_block = block_batch.last().unwrap().metadata.slot;
Expand Down
2 changes: 1 addition & 1 deletion src/ingester/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ pub fn parse_transaction(tx: &TransactionInfo, slot: u64) -> Result<StateUpdate,
}
}

let mut state_update = StateUpdate::merge_updates(state_updates.clone());
let mut state_update = StateUpdate::merge_updates(state_updates.clone())?;
if !is_voting_transaction(tx) || is_compression_transaction {
state_update.transactions.insert(Transaction {
signature: tx.signature,
Expand Down
Loading
Loading