From 4f9614e6dd9d6be41ab88fa7a391c9ae9fa4ea17 Mon Sep 17 00:00:00 2001 From: Alon Haramati Date: Thu, 25 Sep 2025 14:41:15 +0300 Subject: [PATCH] apollo_gateway: extract async code from blocking task --- crates/apollo_gateway/src/gateway.rs | 75 ++++++++++--------- crates/apollo_gateway/src/gateway_test.rs | 87 ++++++----------------- 2 files changed, 66 insertions(+), 96 deletions(-) diff --git a/crates/apollo_gateway/src/gateway.rs b/crates/apollo_gateway/src/gateway.rs index dbb4769396f..7b82d88fa44 100644 --- a/crates/apollo_gateway/src/gateway.rs +++ b/crates/apollo_gateway/src/gateway.rs @@ -25,6 +25,7 @@ use apollo_network_types::network_types::BroadcastedMessageMetadata; use apollo_proc_macros::sequencer_latency_histogram; use apollo_state_sync_types::communication::SharedStateSyncClient; use axum::async_trait; +use starknet_api::executable_transaction::AccountTransaction; use starknet_api::rpc_transaction::{ InternalRpcTransaction, InternalRpcTransactionWithoutTxHash, @@ -124,8 +125,35 @@ impl Gateway { } } - let blocking_task = - ProcessTxBlockingTask::new(self, tx.clone(), tokio::runtime::Handle::current()); + let tx_signature = tx.signature().clone(); + let internal_tx = self + .transaction_converter + .convert_rpc_tx_to_internal_rpc_tx(tx.clone()) + .await + .map_err(|e| { + warn!("Failed to convert RPC transaction to internal RPC transaction: {}", e); + transaction_converter_err_to_deprecated_gw_err(&tx_signature, e) + })?; + + let executable_tx = self + .transaction_converter + .convert_internal_rpc_tx_to_executable_tx(internal_tx.clone()) + .await + .map_err(|e| { + warn!( + "Failed to convert internal RPC transaction to executable transaction: {}", + e + ); + transaction_converter_err_to_deprecated_gw_err(&tx_signature, e) + })?; + + let blocking_task = ProcessTxBlockingTask::new( + self, + tx.clone(), + internal_tx, + executable_tx, + tokio::runtime::Handle::current(), + ); // Run the blocking task in the current span. let curr_span = Span::current(); let handle = @@ -209,63 +237,46 @@ struct ProcessTxBlockingTask { state_reader_factory: Arc, mempool_client: SharedMempoolClient, tx: RpcTransaction, - transaction_converter: Arc, + internal_tx: InternalRpcTransaction, + executable_tx: AccountTransaction, runtime: tokio::runtime::Handle, } impl ProcessTxBlockingTask { - pub fn new(gateway: &Gateway, tx: RpcTransaction, runtime: tokio::runtime::Handle) -> Self { + pub fn new( + gateway: &Gateway, + tx: RpcTransaction, + internal_tx: InternalRpcTransaction, + executable_tx: AccountTransaction, + runtime: tokio::runtime::Handle, + ) -> Self { Self { stateless_tx_validator: gateway.stateless_tx_validator.clone(), stateful_tx_validator_factory: gateway.stateful_tx_validator_factory.clone(), state_reader_factory: gateway.state_reader_factory.clone(), mempool_client: gateway.mempool_client.clone(), tx, - transaction_converter: gateway.transaction_converter.clone(), + internal_tx, + executable_tx, runtime, } } - // TODO(Arni): Make into async function and remove all block_on calls once we manage removing - // the spawn_blocking call. fn process_tx(self) -> GatewayResult { // Perform stateless validations. self.stateless_tx_validator.validate(&self.tx)?; - let tx_signature = self.tx.signature().clone(); - let internal_tx = self - .runtime - .block_on(self.transaction_converter.convert_rpc_tx_to_internal_rpc_tx(self.tx)) - .map_err(|e| { - warn!("Failed to convert RPC transaction to internal RPC transaction: {}", e); - transaction_converter_err_to_deprecated_gw_err(&tx_signature, e) - })?; - - let executable_tx = self - .runtime - .block_on( - self.transaction_converter - .convert_internal_rpc_tx_to_executable_tx(internal_tx.clone()), - ) - .map_err(|e| { - warn!( - "Failed to convert internal RPC transaction to executable transaction: {}", - e - ); - transaction_converter_err_to_deprecated_gw_err(&tx_signature, e) - })?; - let mut stateful_transaction_validator = self .stateful_tx_validator_factory .instantiate_validator(self.state_reader_factory.as_ref())?; let nonce = stateful_transaction_validator.extract_state_nonce_and_run_validations( - &executable_tx, + &self.executable_tx, self.mempool_client, self.runtime, )?; - Ok(AddTransactionArgs::new(internal_tx, nonce)) + Ok(AddTransactionArgs::new(self.internal_tx, nonce)) } } diff --git a/crates/apollo_gateway/src/gateway_test.rs b/crates/apollo_gateway/src/gateway_test.rs index 38103ae87ca..d375e464918 100644 --- a/crates/apollo_gateway/src/gateway_test.rs +++ b/crates/apollo_gateway/src/gateway_test.rs @@ -2,12 +2,7 @@ use std::collections::HashSet; use std::fs::File; use std::sync::{Arc, LazyLock}; -use apollo_class_manager_types::transaction_converter::{ - MockTransactionConverterTrait, - TransactionConverter, - TransactionConverterError, - TransactionConverterResult, -}; +use apollo_class_manager_types::transaction_converter::TransactionConverter; use apollo_class_manager_types::{ClassHashes, EmptyClassManagerClient, MockClassManagerClient}; use apollo_config::dumping::SerializeConfig; use apollo_config::loading::load_and_process_config; @@ -56,10 +51,8 @@ use metrics_exporter_prometheus::PrometheusBuilder; use mockall::predicate::eq; use rstest::{fixture, rstest}; use starknet_api::contract_class::{ContractClass, SierraVersion}; -use starknet_api::core::{ClassHash, ContractAddress, Nonce}; -use starknet_api::executable_transaction::AccountTransaction; +use starknet_api::core::{ContractAddress, Nonce}; use starknet_api::rpc_transaction::{ - InternalRpcTransaction, RpcDeclareTransaction, RpcTransaction, RpcTransactionLabelValue, @@ -113,18 +106,6 @@ fn mock_stateful_transaction_validator_factory() -> MockStatefulTransactionValid MockStatefulTransactionValidatorFactoryTrait::new() } -#[fixture] -fn mock_transaction_converter() -> MockTransactionConverterTrait { - let mut mock_transaction_converter = MockTransactionConverterTrait::new(); - mock_transaction_converter - .expect_convert_rpc_tx_to_internal_rpc_tx() - .return_once(|_| Ok(invoke_args().get_internal_tx())); - mock_transaction_converter - .expect_convert_internal_rpc_tx_to_executable_tx() - .return_once(|_| Ok(executable_invoke_tx(invoke_args()))); - mock_transaction_converter -} - #[fixture] fn mock_stateless_transaction_validator() -> MockStatelessTransactionValidatorTrait { let mut mock_stateless_transaction_validator = MockStatelessTransactionValidatorTrait::new(); @@ -369,7 +350,6 @@ async fn run_add_tx_and_extract_metrics( pub struct ProcessTxOverrides { pub mock_stateful_transaction_validator_factory: Option, - pub mock_transaction_converter: Option, pub mock_stateless_transaction_validator: Option, } @@ -377,8 +357,7 @@ fn process_tx_task(overrides: ProcessTxOverrides) -> ProcessTxBlockingTask { let mock_validator_factory = overrides .mock_stateful_transaction_validator_factory .unwrap_or_else(mock_stateful_transaction_validator_factory); - let mock_transaction_converter = - overrides.mock_transaction_converter.unwrap_or_else(mock_transaction_converter); + let mock_stateless_transaction_validator = overrides .mock_stateless_transaction_validator .unwrap_or_else(mock_stateless_transaction_validator); @@ -389,7 +368,8 @@ fn process_tx_task(overrides: ProcessTxOverrides) -> ProcessTxBlockingTask { state_reader_factory: Arc::new(MockStateReaderFactory::new()), mempool_client: Arc::new(MockMempoolClient::new()), tx: invoke_args().get_rpc_tx(), - transaction_converter: Arc::new(mock_transaction_converter), + internal_tx: invoke_args().get_internal_tx(), + executable_tx: executable_invoke_tx(invoke_args()), runtime: tokio::runtime::Handle::current(), } } @@ -491,6 +471,24 @@ async fn test_compiled_class_hash_mismatch(mut mock_dependencies: MockDependenci assert_eq!(err.code, expected_code); } +#[rstest] +#[tokio::test] +async fn test_transaction_converter_error(mut mock_dependencies: MockDependencies) { + mock_dependencies.mock_mempool_client.expect_add_tx().never(); + + let chain_id = mock_dependencies.config.chain_info.chain_id.clone(); + let gateway = Gateway::new( + mock_dependencies.config, + Arc::new(mock_dependencies.state_reader_factory), + Arc::new(mock_dependencies.mock_mempool_client), + // TODO(AlonH): use MockTransactionConverter. + TransactionConverter::new(Arc::new(EmptyClassManagerClient), chain_id), + ); + + let err = gateway.add_tx(declare_tx(), None).await.unwrap_err(); + assert_matches!(err, StarknetError { code: StarknetErrorCode::KnownErrorCode(_), message: _ }); +} + #[rstest] #[tokio::test] async fn test_block_declare_config( @@ -718,42 +716,3 @@ async fn process_tx_returns_error_when_instantiating_validator_fails( assert!(result.is_err()); assert_eq!(result.unwrap_err().code, error_code); } - -#[rstest] -#[case::rpc_to_internal_fails( - Err(TransactionConverterError::ClassNotFound { class_hash: ClassHash::default() }), - // This value is never used because the first step already fails. Provided a valid executable tx to satisfy the signature. - Ok(executable_invoke_tx(invoke_args())), -)] -#[case::internal_to_executable_fails( - Ok(invoke_args().get_internal_tx()), - Err(TransactionConverterError::ClassNotFound { class_hash: ClassHash::default() }) -)] -#[tokio::test] -async fn process_tx_conversion_errors_are_mapped_to_internal_error( - #[case] expect_internal_rpc_tx_result: TransactionConverterResult, - #[case] expect_executable_tx_result: TransactionConverterResult, -) { - let mut mock_transaction_converter = MockTransactionConverterTrait::new(); - mock_transaction_converter - .expect_convert_rpc_tx_to_internal_rpc_tx() - .return_once(|_| expect_internal_rpc_tx_result); - mock_transaction_converter - .expect_convert_internal_rpc_tx_to_executable_tx() - .return_once(|_| expect_executable_tx_result); - - let overrides = ProcessTxOverrides { - mock_transaction_converter: Some(mock_transaction_converter), - ..Default::default() - }; - let process_tx_task = process_tx_task(overrides); - - let result = tokio::task::spawn_blocking(move || process_tx_task.process_tx()).await.unwrap(); - - assert!(result.is_err()); - // All TransactionConverter errors are mapped to InternalError. - assert_eq!( - result.unwrap_err().code, - StarknetErrorCode::UnknownErrorCode("StarknetErrorCode.InternalError".into()) - ); -}