Skip to content

Commit 0a4dd4c

Browse files
Refactor transaction failure handling to use direct Redis pipeline (#89)
1 parent f1ea797 commit 0a4dd4c

File tree

1 file changed

+47
-32
lines changed

1 file changed

+47
-32
lines changed

executors/src/eoa/store/atomic.rs

Lines changed: 47 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -574,39 +574,54 @@ impl AtomicEoaExecutorStore {
574574
error: EoaExecutorWorkerError,
575575
webhook_queue: Arc<twmq::Queue<WebhookJobHandler>>,
576576
) -> Result<(), TransactionStoreError> {
577-
self.with_lock_check(|pipeline| {
578-
let pending_key = self.pending_transactions_zset_name();
579-
let tx_data_key = self.transaction_data_key_name(&pending_transaction.transaction_id);
580-
let now = chrono::Utc::now().timestamp_millis().max(0) as u64;
581-
582-
// Remove from pending state
583-
pipeline.zrem(&pending_key, &pending_transaction.transaction_id);
584-
585-
// Update transaction data with failure
586-
pipeline.hset(&tx_data_key, "completed_at", now);
587-
pipeline.hset(&tx_data_key, "failure_reason", error.to_string());
588-
pipeline.hset(&tx_data_key, "status", "failed");
589-
590-
let event = EoaExecutorEvent {
591-
transaction_id: pending_transaction.transaction_id.clone(),
592-
address: pending_transaction.user_request.from,
593-
};
594-
595-
let fail_envelope = event.transaction_failed_envelope(error.clone(), 1);
596-
597-
if !pending_transaction.user_request.webhook_options.is_empty() {
598-
let mut tx_context = webhook_queue.transaction_context_from_pipeline(pipeline);
599-
if let Err(e) = queue_webhook_envelopes(
600-
fail_envelope,
601-
pending_transaction.user_request.webhook_options.clone(),
602-
&mut tx_context,
603-
webhook_queue.clone(),
604-
) {
605-
tracing::error!("Failed to queue webhook for fail: {}", e);
606-
}
577+
let mut pipeline = twmq::redis::pipe();
578+
pipeline.atomic();
579+
580+
let pending_key = self.pending_transactions_zset_name();
581+
let tx_data_key = self.transaction_data_key_name(&pending_transaction.transaction_id);
582+
let now = chrono::Utc::now().timestamp_millis().max(0) as u64;
583+
584+
// Remove from pending state
585+
pipeline.zrem(&pending_key, &pending_transaction.transaction_id);
586+
587+
// Update transaction data with failure
588+
pipeline.hset(&tx_data_key, "completed_at", now);
589+
pipeline.hset(&tx_data_key, "failure_reason", error.to_string());
590+
pipeline.hset(&tx_data_key, "status", "failed");
591+
592+
let event = EoaExecutorEvent {
593+
transaction_id: pending_transaction.transaction_id.clone(),
594+
address: pending_transaction.user_request.from,
595+
};
596+
597+
let fail_envelope = event.transaction_failed_envelope(error.clone(), 1);
598+
599+
if !pending_transaction.user_request.webhook_options.is_empty() {
600+
let mut tx_context = webhook_queue.transaction_context_from_pipeline(&mut pipeline);
601+
if let Err(e) = queue_webhook_envelopes(
602+
fail_envelope,
603+
pending_transaction.user_request.webhook_options.clone(),
604+
&mut tx_context,
605+
webhook_queue.clone(),
606+
) {
607+
tracing::error!("Failed to queue webhook for fail: {}", e);
607608
}
608-
})
609-
.await
609+
}
610+
611+
let mut conn = self.redis.clone();
612+
pipeline
613+
.query_async::<Vec<twmq::redis::Value>>(&mut conn)
614+
.await?;
615+
616+
tracing::info!(
617+
transaction_id = %pending_transaction.transaction_id,
618+
eoa = ?self.eoa(),
619+
chain_id = self.chain_id(),
620+
error = %error,
621+
"JOB_LIFECYCLE - Deleted failed pending transaction from EOA"
622+
);
623+
624+
Ok(())
610625
}
611626

612627
pub async fn clean_submitted_transactions(

0 commit comments

Comments
 (0)