Skip to content

Commit 14c7b9a

Browse files
committed
refactor
1 parent 43b68cd commit 14c7b9a

File tree

1 file changed

+71
-75
lines changed

1 file changed

+71
-75
lines changed

crates/ingress-rpc/src/service.rs

Lines changed: 71 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,72 @@ where
114114
let accepted_bundle = AcceptedBundle::new(parsed_bundle, meter_bundle_response.clone());
115115
Ok((accepted_bundle, bundle_hash))
116116
}
117+
118+
/// Helper method to send backrun bundle to all configured builders concurrently
119+
async fn send_backrun_to_builders(&self, bundle: &Bundle, bundle_hash: B256) {
120+
if self.builder_clients.is_empty() {
121+
warn!(
122+
message = "No builder RPCs configured, backrun bundle not submitted",
123+
bundle_hash = %bundle_hash
124+
);
125+
return;
126+
}
127+
128+
let mut tasks = Vec::new();
129+
130+
for (idx, client) in self.builder_clients.iter().enumerate() {
131+
let client = client.clone();
132+
let bundle = bundle.clone();
133+
134+
let task = tokio::spawn(async move {
135+
let result = client
136+
.client()
137+
.request::<(Bundle,), ()>("base_sendBackrunBundle", (bundle,))
138+
.await;
139+
(idx, bundle_hash, result)
140+
});
141+
142+
tasks.push(task);
143+
}
144+
145+
// Wait for all tasks to complete
146+
for task in tasks {
147+
if let Ok((idx, bundle_hash, result)) = task.await {
148+
match result {
149+
Ok(_) => {
150+
info!(
151+
message = "Sent backrun bundle to op-rbuilder",
152+
bundle_hash = %bundle_hash,
153+
builder_idx = idx
154+
);
155+
}
156+
Err(e) => {
157+
warn!(
158+
message = "Failed to send backrun bundle to op-rbuilder",
159+
bundle_hash = %bundle_hash,
160+
builder_idx = idx,
161+
error = %e
162+
);
163+
}
164+
}
165+
}
166+
}
167+
}
168+
169+
/// Helper method to send audit event for a bundle
170+
fn send_audit_event(&self, accepted_bundle: &AcceptedBundle, bundle_hash: B256) {
171+
let audit_event = BundleEvent::Received {
172+
bundle_id: *accepted_bundle.uuid(),
173+
bundle: Box::new(accepted_bundle.clone()),
174+
};
175+
if let Err(e) = self.audit_channel.send(audit_event) {
176+
warn!(
177+
message = "Failed to send audit event",
178+
bundle_hash = %bundle_hash,
179+
error = %e
180+
);
181+
}
182+
}
117183
}
118184

119185
#[async_trait]
@@ -135,66 +201,11 @@ where
135201

136202
let (accepted_bundle, bundle_hash) = self.validate_parse_and_meter_bundle(&bundle).await?;
137203

138-
info!(message = "Validated and parsed backrun bundle", bundle_hash = %bundle_hash);
139-
140204
// Send to all configured builder RPCs concurrently
141-
if self.builder_clients.is_empty() {
142-
warn!(
143-
message = "No builder RPCs configured, backrun bundle not submitted",
144-
bundle_hash = %bundle_hash
145-
);
146-
} else {
147-
let mut tasks = Vec::new();
148-
149-
for (idx, client) in self.builder_clients.iter().enumerate() {
150-
let client = client.clone();
151-
let bundle = bundle.clone();
152-
153-
let task = tokio::spawn(async move {
154-
let result = client
155-
.client()
156-
.request::<(Bundle,), ()>("base_sendBackrunBundle", (bundle,))
157-
.await;
158-
(idx, bundle_hash, result)
159-
});
160-
161-
tasks.push(task);
162-
}
163-
164-
// Wait for all tasks to complete
165-
for task in tasks {
166-
if let Ok((idx, bundle_hash, result)) = task.await {
167-
match result {
168-
Ok(_) => {
169-
info!(
170-
message = "Sent backrun bundle to op-rbuilder",
171-
bundle_hash = %bundle_hash,
172-
builder_idx = idx
173-
);
174-
}
175-
Err(e) => {
176-
warn!(
177-
message = "Failed to send backrun bundle to op-rbuilder",
178-
bundle_hash = %bundle_hash,
179-
builder_idx = idx,
180-
error = %e
181-
);
182-
}
183-
}
184-
}
185-
}
186-
}
187-
188-
let audit_event = BundleEvent::Received {
189-
bundle_id: *accepted_bundle.uuid(),
190-
bundle: Box::new(accepted_bundle.clone()),
191-
};
192-
if let Err(e) = self.audit_channel.send(audit_event) {
193-
warn!(message = "Failed to send audit event", error = %e);
194-
// Don't fail the request
195-
}
205+
self.send_backrun_to_builders(&bundle, bundle_hash).await;
196206

197-
info!(message = "Sent backrun bundle to audit channel", bundle_hash = %bundle_hash);
207+
// Send audit event
208+
self.send_audit_event(&accepted_bundle, bundle_hash);
198209

199210
Ok(BundleHash { bundle_hash })
200211
}
@@ -226,16 +237,7 @@ where
226237
);
227238

228239
// asynchronously send the audit event to the audit channel
229-
let audit_event = BundleEvent::Received {
230-
bundle_id: *accepted_bundle.uuid(),
231-
bundle: Box::new(accepted_bundle.clone()),
232-
};
233-
if let Err(e) = self.audit_channel.send(audit_event) {
234-
warn!(message = "Failed to send audit event", error = %e);
235-
return Err(
236-
EthApiError::InvalidParams("Failed to send audit event".into()).into_rpc_err(),
237-
);
238-
}
240+
self.send_audit_event(&accepted_bundle, bundle_hash);
239241

240242
Ok(BundleHash { bundle_hash })
241243
}
@@ -314,13 +316,7 @@ where
314316
}
315317
}
316318

317-
let audit_event = BundleEvent::Received {
318-
bundle_id: *accepted_bundle.uuid(),
319-
bundle: accepted_bundle.clone().into(),
320-
};
321-
if let Err(e) = self.audit_channel.send(audit_event) {
322-
warn!(message = "Failed to send audit event", error = %e);
323-
}
319+
self.send_audit_event(&accepted_bundle, transaction.tx_hash());
324320

325321
self.metrics
326322
.send_raw_transaction_duration

0 commit comments

Comments
 (0)