Skip to content

Commit 9651f7a

Browse files
authored
feat(ingress-rpc): Add raw transaction forwarding to additional endpoint (#88)
1 parent be37779 commit 9651f7a

File tree

4 files changed

+132
-30
lines changed

4 files changed

+132
-30
lines changed

crates/ingress-rpc/src/bin/main.rs

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use alloy_provider::{ProviderBuilder, RootProvider};
1+
use alloy_provider::ProviderBuilder;
22
use clap::Parser;
33
use jsonrpsee::server::Server;
44
use op_alloy_network::Optimism;
@@ -13,7 +13,7 @@ use tips_ingress_rpc::connect_ingress_to_builder;
1313
use tips_ingress_rpc::health::bind_health_server;
1414
use tips_ingress_rpc::metrics::init_prometheus_exporter;
1515
use tips_ingress_rpc::queue::KafkaQueuePublisher;
16-
use tips_ingress_rpc::service::{IngressApiServer, IngressService};
16+
use tips_ingress_rpc::service::{IngressApiServer, IngressService, Providers};
1717
use tokio::sync::{broadcast, mpsc};
1818
use tracing::info;
1919

@@ -39,15 +39,22 @@ async fn main() -> anyhow::Result<()> {
3939
health_check_address = %config.health_check_addr,
4040
);
4141

42-
let provider: RootProvider<Optimism> = ProviderBuilder::new()
43-
.disable_recommended_fillers()
44-
.network::<Optimism>()
45-
.connect_http(config.mempool_url);
46-
47-
let simulation_provider: RootProvider<Optimism> = ProviderBuilder::new()
48-
.disable_recommended_fillers()
49-
.network::<Optimism>()
50-
.connect_http(config.simulation_rpc);
42+
let providers = Providers {
43+
mempool: ProviderBuilder::new()
44+
.disable_recommended_fillers()
45+
.network::<Optimism>()
46+
.connect_http(config.mempool_url),
47+
simulation: ProviderBuilder::new()
48+
.disable_recommended_fillers()
49+
.network::<Optimism>()
50+
.connect_http(config.simulation_rpc),
51+
raw_tx_forward: config.raw_tx_forward_rpc.clone().map(|url| {
52+
ProviderBuilder::new()
53+
.disable_recommended_fillers()
54+
.network::<Optimism>()
55+
.connect_http(url)
56+
}),
57+
};
5158

5259
let ingress_client_config = ClientConfig::from_iter(load_kafka_config_from_file(
5360
&config.ingress_kafka_properties,
@@ -83,8 +90,7 @@ async fn main() -> anyhow::Result<()> {
8390
);
8491

8592
let service = IngressService::new(
86-
provider,
87-
simulation_provider,
93+
providers,
8894
queue,
8995
audit_tx,
9096
builder_tx,

crates/ingress-rpc/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,10 @@ pub struct Config {
164164
/// Enable backrun bundle submission to op-rbuilder
165165
#[arg(long, env = "TIPS_INGRESS_BACKRUN_ENABLED", default_value = "false")]
166166
pub backrun_enabled: bool,
167+
168+
/// URL of third-party RPC endpoint to forward raw transactions to (enables forwarding if set)
169+
#[arg(long, env = "TIPS_INGRESS_RAW_TX_FORWARD_RPC")]
170+
pub raw_tx_forward_rpc: Option<Url>,
167171
}
168172

169173
pub fn connect_ingress_to_builder(

crates/ingress-rpc/src/metrics.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ pub struct Metrics {
3535

3636
#[metric(describe = "Duration to send backrun bundle to op-rbuilder")]
3737
pub backrun_bundles_sent_duration: Histogram,
38+
39+
#[metric(describe = "Total raw transactions forwarded to additional endpoint")]
40+
pub raw_tx_forwards_total: Counter,
3841
}
3942

4043
/// Initialize Prometheus metrics exporter

crates/ingress-rpc/src/service.rs

Lines changed: 106 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use tips_core::{
1717
};
1818
use tokio::sync::{broadcast, mpsc};
1919
use tokio::time::{Duration, Instant, timeout};
20-
use tracing::{info, warn};
20+
use tracing::{debug, info, warn};
2121

2222
use crate::metrics::{Metrics, record_histogram};
2323
use crate::queue::QueuePublisher;
@@ -27,6 +27,13 @@ use account_abstraction_core::types::{SendUserOperationResponse, UserOperationRe
2727
use account_abstraction_core::{AccountAbstractionService, AccountAbstractionServiceImpl};
2828
use std::sync::Arc;
2929

30+
/// RPC providers for different endpoints
31+
pub struct Providers {
32+
pub mempool: RootProvider<Optimism>,
33+
pub simulation: RootProvider<Optimism>,
34+
pub raw_tx_forward: Option<RootProvider<Optimism>>,
35+
}
36+
3037
#[rpc(server, namespace = "eth")]
3138
pub trait IngressApi {
3239
/// `eth_sendBundle` can be used to send your bundles to the builder.
@@ -53,8 +60,9 @@ pub trait IngressApi {
5360
}
5461

5562
pub struct IngressService<Queue> {
56-
provider: Arc<RootProvider<Optimism>>,
63+
mempool_provider: Arc<RootProvider<Optimism>>,
5764
simulation_provider: Arc<RootProvider<Optimism>>,
65+
raw_tx_forward_provider: Option<Arc<RootProvider<Optimism>>>,
5866
account_abstraction_service: AccountAbstractionServiceImpl,
5967
tx_submission_method: TxSubmissionMethod,
6068
bundle_queue: Queue,
@@ -70,24 +78,26 @@ pub struct IngressService<Queue> {
7078

7179
impl<Queue> IngressService<Queue> {
7280
pub fn new(
73-
provider: RootProvider<Optimism>,
74-
simulation_provider: RootProvider<Optimism>,
81+
providers: Providers,
7582
queue: Queue,
7683
audit_channel: mpsc::UnboundedSender<BundleEvent>,
7784
builder_tx: broadcast::Sender<MeterBundleResponse>,
7885
builder_backrun_tx: broadcast::Sender<Bundle>,
7986
config: Config,
8087
) -> Self {
81-
let provider = Arc::new(provider);
82-
let simulation_provider = Arc::new(simulation_provider);
88+
let mempool_provider = Arc::new(providers.mempool);
89+
let simulation_provider = Arc::new(providers.simulation);
90+
let raw_tx_forward_provider = providers.raw_tx_forward.map(Arc::new);
8391
let account_abstraction_service: AccountAbstractionServiceImpl =
8492
AccountAbstractionServiceImpl::new(
8593
simulation_provider.clone(),
8694
config.validate_user_operation_timeout_ms,
8795
);
96+
8897
Self {
89-
provider,
98+
mempool_provider,
9099
simulation_provider,
100+
raw_tx_forward_provider,
91101
account_abstraction_service,
92102
tx_submission_method: config.tx_submission_method,
93103
bundle_queue: queue,
@@ -241,7 +251,7 @@ where
241251

242252
if send_to_mempool {
243253
let response = self
244-
.provider
254+
.mempool_provider
245255
.send_raw_transaction(data.iter().as_slice())
246256
.await;
247257
match response {
@@ -254,6 +264,25 @@ where
254264
}
255265
}
256266

267+
if let Some(forward_provider) = self.raw_tx_forward_provider.clone() {
268+
self.metrics.raw_tx_forwards_total.increment(1);
269+
let tx_data = data.clone();
270+
let tx_hash = transaction.tx_hash();
271+
tokio::spawn(async move {
272+
match forward_provider
273+
.send_raw_transaction(tx_data.iter().as_slice())
274+
.await
275+
{
276+
Ok(_) => {
277+
debug!(message = "Forwarded raw tx", hash = %tx_hash);
278+
}
279+
Err(e) => {
280+
warn!(message = "Failed to forward raw tx", hash = %tx_hash, error = %e);
281+
}
282+
}
283+
});
284+
}
285+
257286
self.send_audit_event(&accepted_bundle, transaction.tx_hash());
258287

259288
self.metrics
@@ -407,7 +436,7 @@ mod tests {
407436
use alloy_provider::RootProvider;
408437
use async_trait::async_trait;
409438
use std::net::{IpAddr, SocketAddr};
410-
use std::sync::Arc;
439+
use std::str::FromStr;
411440
use tips_core::test_utils::create_test_meter_bundle_response;
412441
use tokio::sync::{broadcast, mpsc};
413442
use url::Url;
@@ -449,6 +478,7 @@ mod tests {
449478
max_buffered_backrun_bundles: 100,
450479
health_check_addr: SocketAddr::from(([127, 0, 0, 1], 8081)),
451480
backrun_enabled: false,
481+
raw_tx_forward_rpc: None,
452482
}
453483
}
454484

@@ -519,20 +549,19 @@ mod tests {
519549

520550
let provider: RootProvider<Optimism> =
521551
RootProvider::new_http(mock_server.uri().parse().unwrap());
522-
let simulation_provider = Arc::new(provider.clone());
552+
553+
let providers = Providers {
554+
mempool: provider.clone(),
555+
simulation: provider.clone(),
556+
raw_tx_forward: None,
557+
};
523558

524559
let (audit_tx, _audit_rx) = mpsc::unbounded_channel();
525560
let (builder_tx, _builder_rx) = broadcast::channel(1);
526561
let (backrun_tx, _backrun_rx) = broadcast::channel(1);
527562

528563
let service = IngressService::new(
529-
provider,
530-
simulation_provider.as_ref().clone(),
531-
MockQueue,
532-
audit_tx,
533-
builder_tx,
534-
backrun_tx,
535-
config,
564+
providers, MockQueue, audit_tx, builder_tx, backrun_tx, config,
536565
);
537566

538567
let bundle = Bundle::default();
@@ -545,4 +574,64 @@ mod tests {
545574
let response = result.unwrap_or_else(|_| MeterBundleResponse::default());
546575
assert_eq!(response, MeterBundleResponse::default());
547576
}
577+
578+
#[tokio::test]
579+
async fn test_raw_tx_forward() {
580+
let simulation_server = MockServer::start().await;
581+
let forward_server = MockServer::start().await;
582+
583+
// Mock error response from base_meterBundle
584+
Mock::given(method("POST"))
585+
.respond_with(ResponseTemplate::new(500).set_body_json(serde_json::json!({
586+
"jsonrpc": "2.0",
587+
"id": 1,
588+
"error": {
589+
"code": -32000,
590+
"message": "Simulation failed"
591+
}
592+
})))
593+
.mount(&simulation_server)
594+
.await;
595+
596+
// Mock forward endpoint - expect exactly 1 call
597+
Mock::given(method("POST"))
598+
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
599+
"jsonrpc": "2.0",
600+
"id": 1,
601+
"result": "0x0000000000000000000000000000000000000000000000000000000000000000"
602+
})))
603+
.expect(1)
604+
.mount(&forward_server)
605+
.await;
606+
607+
let mut config = create_test_config(&simulation_server);
608+
config.tx_submission_method = TxSubmissionMethod::Kafka; // Skip mempool send
609+
610+
let providers = Providers {
611+
mempool: RootProvider::new_http(simulation_server.uri().parse().unwrap()),
612+
simulation: RootProvider::new_http(simulation_server.uri().parse().unwrap()),
613+
raw_tx_forward: Some(RootProvider::new_http(
614+
forward_server.uri().parse().unwrap(),
615+
)),
616+
};
617+
618+
let (audit_tx, _audit_rx) = mpsc::unbounded_channel();
619+
let (builder_tx, _builder_rx) = broadcast::channel(1);
620+
let (backrun_tx, _backrun_rx) = broadcast::channel(1);
621+
622+
let service = IngressService::new(
623+
providers, MockQueue, audit_tx, builder_tx, backrun_tx, config,
624+
);
625+
626+
// Valid signed transaction bytes
627+
let tx_bytes = Bytes::from_str("0x02f86c0d010183072335825208940000000000000000000000000000000000000000872386f26fc1000080c001a0cdb9e4f2f1ba53f9429077e7055e078cf599786e29059cd80c5e0e923bb2c114a01c90e29201e031baf1da66296c3a5c15c200bcb5e6c34da2f05f7d1778f8be07").unwrap();
628+
629+
let result = service.send_raw_transaction(tx_bytes).await;
630+
assert!(result.is_ok());
631+
632+
// Wait for spawned forward task to complete
633+
tokio::time::sleep(Duration::from_millis(100)).await;
634+
635+
// wiremock automatically verifies expect(1) when forward_server is dropped
636+
}
548637
}

0 commit comments

Comments
 (0)