Skip to content

Commit 4f9614e

Browse files
committed
apollo_gateway: extract async code from blocking task
1 parent d44ad43 commit 4f9614e

File tree

2 files changed

+66
-96
lines changed

2 files changed

+66
-96
lines changed

crates/apollo_gateway/src/gateway.rs

Lines changed: 43 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use apollo_network_types::network_types::BroadcastedMessageMetadata;
2525
use apollo_proc_macros::sequencer_latency_histogram;
2626
use apollo_state_sync_types::communication::SharedStateSyncClient;
2727
use axum::async_trait;
28+
use starknet_api::executable_transaction::AccountTransaction;
2829
use starknet_api::rpc_transaction::{
2930
InternalRpcTransaction,
3031
InternalRpcTransactionWithoutTxHash,
@@ -124,8 +125,35 @@ impl Gateway {
124125
}
125126
}
126127

127-
let blocking_task =
128-
ProcessTxBlockingTask::new(self, tx.clone(), tokio::runtime::Handle::current());
128+
let tx_signature = tx.signature().clone();
129+
let internal_tx = self
130+
.transaction_converter
131+
.convert_rpc_tx_to_internal_rpc_tx(tx.clone())
132+
.await
133+
.map_err(|e| {
134+
warn!("Failed to convert RPC transaction to internal RPC transaction: {}", e);
135+
transaction_converter_err_to_deprecated_gw_err(&tx_signature, e)
136+
})?;
137+
138+
let executable_tx = self
139+
.transaction_converter
140+
.convert_internal_rpc_tx_to_executable_tx(internal_tx.clone())
141+
.await
142+
.map_err(|e| {
143+
warn!(
144+
"Failed to convert internal RPC transaction to executable transaction: {}",
145+
e
146+
);
147+
transaction_converter_err_to_deprecated_gw_err(&tx_signature, e)
148+
})?;
149+
150+
let blocking_task = ProcessTxBlockingTask::new(
151+
self,
152+
tx.clone(),
153+
internal_tx,
154+
executable_tx,
155+
tokio::runtime::Handle::current(),
156+
);
129157
// Run the blocking task in the current span.
130158
let curr_span = Span::current();
131159
let handle =
@@ -209,63 +237,46 @@ struct ProcessTxBlockingTask {
209237
state_reader_factory: Arc<dyn StateReaderFactory>,
210238
mempool_client: SharedMempoolClient,
211239
tx: RpcTransaction,
212-
transaction_converter: Arc<dyn TransactionConverterTrait>,
240+
internal_tx: InternalRpcTransaction,
241+
executable_tx: AccountTransaction,
213242
runtime: tokio::runtime::Handle,
214243
}
215244

216245
impl ProcessTxBlockingTask {
217-
pub fn new(gateway: &Gateway, tx: RpcTransaction, runtime: tokio::runtime::Handle) -> Self {
246+
pub fn new(
247+
gateway: &Gateway,
248+
tx: RpcTransaction,
249+
internal_tx: InternalRpcTransaction,
250+
executable_tx: AccountTransaction,
251+
runtime: tokio::runtime::Handle,
252+
) -> Self {
218253
Self {
219254
stateless_tx_validator: gateway.stateless_tx_validator.clone(),
220255
stateful_tx_validator_factory: gateway.stateful_tx_validator_factory.clone(),
221256
state_reader_factory: gateway.state_reader_factory.clone(),
222257
mempool_client: gateway.mempool_client.clone(),
223258
tx,
224-
transaction_converter: gateway.transaction_converter.clone(),
259+
internal_tx,
260+
executable_tx,
225261
runtime,
226262
}
227263
}
228264

229-
// TODO(Arni): Make into async function and remove all block_on calls once we manage removing
230-
// the spawn_blocking call.
231265
fn process_tx(self) -> GatewayResult<AddTransactionArgs> {
232266
// Perform stateless validations.
233267
self.stateless_tx_validator.validate(&self.tx)?;
234268

235-
let tx_signature = self.tx.signature().clone();
236-
let internal_tx = self
237-
.runtime
238-
.block_on(self.transaction_converter.convert_rpc_tx_to_internal_rpc_tx(self.tx))
239-
.map_err(|e| {
240-
warn!("Failed to convert RPC transaction to internal RPC transaction: {}", e);
241-
transaction_converter_err_to_deprecated_gw_err(&tx_signature, e)
242-
})?;
243-
244-
let executable_tx = self
245-
.runtime
246-
.block_on(
247-
self.transaction_converter
248-
.convert_internal_rpc_tx_to_executable_tx(internal_tx.clone()),
249-
)
250-
.map_err(|e| {
251-
warn!(
252-
"Failed to convert internal RPC transaction to executable transaction: {}",
253-
e
254-
);
255-
transaction_converter_err_to_deprecated_gw_err(&tx_signature, e)
256-
})?;
257-
258269
let mut stateful_transaction_validator = self
259270
.stateful_tx_validator_factory
260271
.instantiate_validator(self.state_reader_factory.as_ref())?;
261272

262273
let nonce = stateful_transaction_validator.extract_state_nonce_and_run_validations(
263-
&executable_tx,
274+
&self.executable_tx,
264275
self.mempool_client,
265276
self.runtime,
266277
)?;
267278

268-
Ok(AddTransactionArgs::new(internal_tx, nonce))
279+
Ok(AddTransactionArgs::new(self.internal_tx, nonce))
269280
}
270281
}
271282

crates/apollo_gateway/src/gateway_test.rs

Lines changed: 23 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,7 @@ use std::collections::HashSet;
22
use std::fs::File;
33
use std::sync::{Arc, LazyLock};
44

5-
use apollo_class_manager_types::transaction_converter::{
6-
MockTransactionConverterTrait,
7-
TransactionConverter,
8-
TransactionConverterError,
9-
TransactionConverterResult,
10-
};
5+
use apollo_class_manager_types::transaction_converter::TransactionConverter;
116
use apollo_class_manager_types::{ClassHashes, EmptyClassManagerClient, MockClassManagerClient};
127
use apollo_config::dumping::SerializeConfig;
138
use apollo_config::loading::load_and_process_config;
@@ -56,10 +51,8 @@ use metrics_exporter_prometheus::PrometheusBuilder;
5651
use mockall::predicate::eq;
5752
use rstest::{fixture, rstest};
5853
use starknet_api::contract_class::{ContractClass, SierraVersion};
59-
use starknet_api::core::{ClassHash, ContractAddress, Nonce};
60-
use starknet_api::executable_transaction::AccountTransaction;
54+
use starknet_api::core::{ContractAddress, Nonce};
6155
use starknet_api::rpc_transaction::{
62-
InternalRpcTransaction,
6356
RpcDeclareTransaction,
6457
RpcTransaction,
6558
RpcTransactionLabelValue,
@@ -113,18 +106,6 @@ fn mock_stateful_transaction_validator_factory() -> MockStatefulTransactionValid
113106
MockStatefulTransactionValidatorFactoryTrait::new()
114107
}
115108

116-
#[fixture]
117-
fn mock_transaction_converter() -> MockTransactionConverterTrait {
118-
let mut mock_transaction_converter = MockTransactionConverterTrait::new();
119-
mock_transaction_converter
120-
.expect_convert_rpc_tx_to_internal_rpc_tx()
121-
.return_once(|_| Ok(invoke_args().get_internal_tx()));
122-
mock_transaction_converter
123-
.expect_convert_internal_rpc_tx_to_executable_tx()
124-
.return_once(|_| Ok(executable_invoke_tx(invoke_args())));
125-
mock_transaction_converter
126-
}
127-
128109
#[fixture]
129110
fn mock_stateless_transaction_validator() -> MockStatelessTransactionValidatorTrait {
130111
let mut mock_stateless_transaction_validator = MockStatelessTransactionValidatorTrait::new();
@@ -369,16 +350,14 @@ async fn run_add_tx_and_extract_metrics(
369350
pub struct ProcessTxOverrides {
370351
pub mock_stateful_transaction_validator_factory:
371352
Option<MockStatefulTransactionValidatorFactoryTrait>,
372-
pub mock_transaction_converter: Option<MockTransactionConverterTrait>,
373353
pub mock_stateless_transaction_validator: Option<MockStatelessTransactionValidatorTrait>,
374354
}
375355

376356
fn process_tx_task(overrides: ProcessTxOverrides) -> ProcessTxBlockingTask {
377357
let mock_validator_factory = overrides
378358
.mock_stateful_transaction_validator_factory
379359
.unwrap_or_else(mock_stateful_transaction_validator_factory);
380-
let mock_transaction_converter =
381-
overrides.mock_transaction_converter.unwrap_or_else(mock_transaction_converter);
360+
382361
let mock_stateless_transaction_validator = overrides
383362
.mock_stateless_transaction_validator
384363
.unwrap_or_else(mock_stateless_transaction_validator);
@@ -389,7 +368,8 @@ fn process_tx_task(overrides: ProcessTxOverrides) -> ProcessTxBlockingTask {
389368
state_reader_factory: Arc::new(MockStateReaderFactory::new()),
390369
mempool_client: Arc::new(MockMempoolClient::new()),
391370
tx: invoke_args().get_rpc_tx(),
392-
transaction_converter: Arc::new(mock_transaction_converter),
371+
internal_tx: invoke_args().get_internal_tx(),
372+
executable_tx: executable_invoke_tx(invoke_args()),
393373
runtime: tokio::runtime::Handle::current(),
394374
}
395375
}
@@ -491,6 +471,24 @@ async fn test_compiled_class_hash_mismatch(mut mock_dependencies: MockDependenci
491471
assert_eq!(err.code, expected_code);
492472
}
493473

474+
#[rstest]
475+
#[tokio::test]
476+
async fn test_transaction_converter_error(mut mock_dependencies: MockDependencies) {
477+
mock_dependencies.mock_mempool_client.expect_add_tx().never();
478+
479+
let chain_id = mock_dependencies.config.chain_info.chain_id.clone();
480+
let gateway = Gateway::new(
481+
mock_dependencies.config,
482+
Arc::new(mock_dependencies.state_reader_factory),
483+
Arc::new(mock_dependencies.mock_mempool_client),
484+
// TODO(AlonH): use MockTransactionConverter.
485+
TransactionConverter::new(Arc::new(EmptyClassManagerClient), chain_id),
486+
);
487+
488+
let err = gateway.add_tx(declare_tx(), None).await.unwrap_err();
489+
assert_matches!(err, StarknetError { code: StarknetErrorCode::KnownErrorCode(_), message: _ });
490+
}
491+
494492
#[rstest]
495493
#[tokio::test]
496494
async fn test_block_declare_config(
@@ -718,42 +716,3 @@ async fn process_tx_returns_error_when_instantiating_validator_fails(
718716
assert!(result.is_err());
719717
assert_eq!(result.unwrap_err().code, error_code);
720718
}
721-
722-
#[rstest]
723-
#[case::rpc_to_internal_fails(
724-
Err(TransactionConverterError::ClassNotFound { class_hash: ClassHash::default() }),
725-
// This value is never used because the first step already fails. Provided a valid executable tx to satisfy the signature.
726-
Ok(executable_invoke_tx(invoke_args())),
727-
)]
728-
#[case::internal_to_executable_fails(
729-
Ok(invoke_args().get_internal_tx()),
730-
Err(TransactionConverterError::ClassNotFound { class_hash: ClassHash::default() })
731-
)]
732-
#[tokio::test]
733-
async fn process_tx_conversion_errors_are_mapped_to_internal_error(
734-
#[case] expect_internal_rpc_tx_result: TransactionConverterResult<InternalRpcTransaction>,
735-
#[case] expect_executable_tx_result: TransactionConverterResult<AccountTransaction>,
736-
) {
737-
let mut mock_transaction_converter = MockTransactionConverterTrait::new();
738-
mock_transaction_converter
739-
.expect_convert_rpc_tx_to_internal_rpc_tx()
740-
.return_once(|_| expect_internal_rpc_tx_result);
741-
mock_transaction_converter
742-
.expect_convert_internal_rpc_tx_to_executable_tx()
743-
.return_once(|_| expect_executable_tx_result);
744-
745-
let overrides = ProcessTxOverrides {
746-
mock_transaction_converter: Some(mock_transaction_converter),
747-
..Default::default()
748-
};
749-
let process_tx_task = process_tx_task(overrides);
750-
751-
let result = tokio::task::spawn_blocking(move || process_tx_task.process_tx()).await.unwrap();
752-
753-
assert!(result.is_err());
754-
// All TransactionConverter errors are mapped to InternalError.
755-
assert_eq!(
756-
result.unwrap_err().code,
757-
StarknetErrorCode::UnknownErrorCode("StarknetErrorCode.InternalError".into())
758-
);
759-
}

0 commit comments

Comments
 (0)