Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 12 additions & 15 deletions crates/apollo_gateway/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use apollo_network_types::network_types::BroadcastedMessageMetadata;
use apollo_proc_macros::sequencer_latency_histogram;
use apollo_state_sync_types::communication::SharedStateSyncClient;
use axum::async_trait;
use starknet_api::core::Nonce;
use starknet_api::executable_transaction::AccountTransaction;
use starknet_api::rpc_transaction::{
InternalRpcTransaction,
Expand Down Expand Up @@ -149,19 +150,15 @@ impl Gateway {
transaction_converter_err_to_deprecated_gw_err(&tx_signature, e)
})?;

let blocking_task = ProcessTxBlockingTask::new(
self,
internal_tx,
executable_tx,
tokio::runtime::Handle::current(),
);
let blocking_task =
ProcessTxBlockingTask::new(self, executable_tx, tokio::runtime::Handle::current());
// Run the blocking task in the current span.
let curr_span = Span::current();
let handle =
tokio::task::spawn_blocking(move || curr_span.in_scope(|| blocking_task.process_tx()));
let handle_result = handle.await;
let add_tx_args = match handle_result {
Ok(Ok(add_tx_args)) => add_tx_args,
let nonce = match handle_result {
Ok(Ok(nonce)) => nonce,
Ok(Err(starknet_err)) => {
info!(
"Gateway validation failed for tx with signature: {:?} with error: {}",
Expand All @@ -182,9 +179,12 @@ impl Gateway {
}
};

let gateway_output = create_gateway_output(&add_tx_args.tx);
let gateway_output = create_gateway_output(&internal_tx);

let add_tx_args = AddTransactionArgsWrapper { args: add_tx_args, p2p_message_metadata };
let add_tx_args = AddTransactionArgsWrapper {
args: AddTransactionArgs::new(internal_tx, nonce),
p2p_message_metadata,
};
match mempool_client_result_to_deprecated_gw_result(
tx.signature(),
self.mempool_client.add_tx(add_tx_args).await,
Expand Down Expand Up @@ -236,29 +236,26 @@ struct ProcessTxBlockingTask {
stateful_tx_validator_factory: Arc<dyn StatefulTransactionValidatorFactoryTrait>,
state_reader_factory: Arc<dyn StateReaderFactory>,
mempool_client: SharedMempoolClient,
internal_tx: InternalRpcTransaction,
executable_tx: AccountTransaction,
runtime: tokio::runtime::Handle,
}

impl ProcessTxBlockingTask {
pub fn new(
gateway: &Gateway,
internal_tx: InternalRpcTransaction,
executable_tx: AccountTransaction,
runtime: tokio::runtime::Handle,
) -> Self {
Self {
stateful_tx_validator_factory: gateway.stateful_tx_validator_factory.clone(),
state_reader_factory: gateway.state_reader_factory.clone(),
mempool_client: gateway.mempool_client.clone(),
internal_tx,
executable_tx,
runtime,
}
}

fn process_tx(self) -> GatewayResult<AddTransactionArgs> {
fn process_tx(self) -> GatewayResult<Nonce> {
let mut stateful_transaction_validator = self
.stateful_tx_validator_factory
.instantiate_validator(self.state_reader_factory.as_ref())?;
Expand All @@ -269,7 +266,7 @@ impl ProcessTxBlockingTask {
self.runtime,
)?;

Ok(AddTransactionArgs::new(self.internal_tx, nonce))
Ok(nonce)
}
}

Expand Down
1 change: 0 additions & 1 deletion crates/apollo_gateway/src/gateway_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,6 @@ fn process_tx_task(
stateful_tx_validator_factory: Arc::new(stateful_transaction_validator_factory),
state_reader_factory: Arc::new(MockStateReaderFactory::new()),
mempool_client: Arc::new(MockMempoolClient::new()),
internal_tx: invoke_args().get_internal_tx(),
executable_tx: executable_invoke_tx(invoke_args()),
runtime: tokio::runtime::Handle::current(),
}
Expand Down