@@ -73,6 +73,8 @@ pub struct KeeperMetrics {
7373 pub total_gas_spent : Family < AccountLabel , Gauge < f64 , AtomicU64 > > ,
7474 pub requests : Family < AccountLabel , Counter > ,
7575 pub requests_processed : Family < AccountLabel , Counter > ,
76+ pub requests_processed_success : Family < AccountLabel , Counter > ,
77+ pub requests_processed_failure : Family < AccountLabel , Counter > ,
7678 pub requests_reprocessed : Family < AccountLabel , Counter > ,
7779 pub reveals : Family < AccountLabel , Counter > ,
7880 pub request_duration_ms : Family < AccountLabel , Histogram > ,
@@ -89,6 +91,8 @@ impl Default for KeeperMetrics {
8991 total_gas_spent : Family :: default ( ) ,
9092 requests : Family :: default ( ) ,
9193 requests_processed : Family :: default ( ) ,
94+ requests_processed_success : Family :: default ( ) ,
95+ requests_processed_failure : Family :: default ( ) ,
9296 requests_reprocessed : Family :: default ( ) ,
9397 reveals : Family :: default ( ) ,
9498 request_duration_ms : Family :: new_with_constructor ( || {
@@ -133,6 +137,18 @@ impl KeeperMetrics {
133137 keeper_metrics. requests_processed . clone ( ) ,
134138 ) ;
135139
140+ writable_registry. register (
141+ "requests_processed_success" ,
142+ "Number of requests processed successfully" ,
143+ keeper_metrics. requests_processed_success . clone ( ) ,
144+ ) ;
145+
146+ writable_registry. register (
147+ "requests_processed_failure" ,
148+ "Number of requests processed with failure" ,
149+ keeper_metrics. requests_processed_failure . clone ( ) ,
150+ ) ;
151+
136152 writable_registry. register (
137153 "reveal" ,
138154 "Number of reveals" ,
@@ -171,7 +187,7 @@ impl KeeperMetrics {
171187
172188 writable_registry. register (
173189 "request_duration_ms" ,
174- "Time taken to process each callback request in milliseconds" ,
190+ "Time taken to process each successful callback request in milliseconds" ,
175191 keeper_metrics. request_duration_ms . clone ( ) ,
176192 ) ;
177193
@@ -382,14 +398,12 @@ pub async fn process_event_with_backoff(
382398 metrics : Arc < KeeperMetrics > ,
383399) {
384400 let start_time = std:: time:: Instant :: now ( ) ;
401+ let account_label = AccountLabel {
402+ chain_id : chain_state. id . clone ( ) ,
403+ address : chain_state. provider_address . to_string ( ) ,
404+ } ;
385405
386- metrics
387- . requests
388- . get_or_create ( & AccountLabel {
389- chain_id : chain_state. id . clone ( ) ,
390- address : chain_state. provider_address . to_string ( ) ,
391- } )
392- . inc ( ) ;
406+ metrics. requests . get_or_create ( & account_label) . inc ( ) ;
393407 tracing:: info!( "Started processing event" ) ;
394408 let backoff = ExponentialBackoff {
395409 max_elapsed_time : Some ( Duration :: from_secs ( 300 ) ) , // retry for 5 minutes
@@ -398,7 +412,7 @@ pub async fn process_event_with_backoff(
398412
399413 let current_multiplier = Arc :: new ( AtomicU64 :: new ( DEFAULT_GAS_ESTIMATE_MULTIPLIER_PCT ) ) ;
400414
401- match backoff:: future:: retry_notify (
415+ let success = backoff:: future:: retry_notify (
402416 backoff,
403417 || async {
404418 let multiplier = current_multiplier. load ( std:: sync:: atomic:: Ordering :: Relaxed ) ;
@@ -426,32 +440,48 @@ pub async fn process_event_with_backoff(
426440 ) ;
427441 } ,
428442 )
429- . await
430- {
431- Ok ( ( ) ) => {
432- tracing:: info!( "Processed event" , ) ;
433- }
434- Err ( e) => {
435- tracing:: error!( "Failed to process event: {:?}" , e) ;
436- }
437- }
443+ . await ;
438444
439- let duration_ms = start_time. elapsed ( ) . as_millis ( ) as f64 ;
440- metrics
441- . request_duration_ms
442- . get_or_create ( & AccountLabel {
443- chain_id : chain_state. id . clone ( ) ,
444- address : chain_state. provider_address . to_string ( ) ,
445- } )
446- . observe ( duration_ms) ;
445+ let duration = start_time. elapsed ( ) ;
447446
448447 metrics
449448 . requests_processed
450- . get_or_create ( & AccountLabel {
451- chain_id : chain_state. id . clone ( ) ,
452- address : chain_state. provider_address . to_string ( ) ,
453- } )
449+ . get_or_create ( & account_label)
454450 . inc ( ) ;
451+
452+ match success {
453+ Ok ( ( ) ) => {
454+ tracing:: info!( "Processed event successfully in {:?}" , duration) ;
455+
456+ metrics
457+ . requests_processed_success
458+ . get_or_create ( & account_label)
459+ . inc ( ) ;
460+
461+ metrics
462+ . request_duration_ms
463+ . get_or_create ( & account_label)
464+ . observe ( duration. as_millis ( ) as f64 ) ;
465+ }
466+ Err ( e) => {
467+ // In case the callback did not succeed, we double-check that the request is still on-chain.
468+ // If the request is no longer on-chain, one of the transactions we sent likely succeeded, but
469+ // the RPC gave us an error anyway.
470+ let req = chain_state
471+ . contract
472+ . get_request ( event. provider_address , event. sequence_number )
473+ . await ;
474+ tracing:: error!( "Failed to process event: {:?}. Request: {:?}" , e, req) ;
475+
476+ // We only count failures for cases where we are completely certain that the callback failed.
477+ if req. is_ok_and ( |x| x. is_some ( ) ) {
478+ metrics
479+ . requests_processed_failure
480+ . get_or_create ( & account_label)
481+ . inc ( ) ;
482+ }
483+ }
484+ }
455485}
456486
457487const TX_CONFIRMATION_TIMEOUT_SECS : u64 = 30 ;
0 commit comments