@@ -17,64 +17,67 @@ mod quota;
1717mod state_machine_manager;
1818mod status_store;
1919
20- use input_command:: { InputCommand , InvokeCommand } ;
21- use invocation_state_machine:: InvocationStateMachine ;
22- use invocation_task:: InvocationTask ;
23- use invocation_task:: { InvocationTaskOutput , InvocationTaskOutputInner } ;
24- use metric_definitions:: { INVOKER_PENDING_TASKS , INVOKER_TASKS_IN_FLIGHT } ;
25- use metrics:: { counter, gauge} ;
26- use restate_core:: cancellation_watcher;
27- use restate_errors:: warn_it;
28- use restate_invoker_api:: {
29- Effect , EffectKind , EntryEnricher , InvocationErrorReport , InvocationStatusReport ,
30- InvokeInputJournal ,
31- } ;
32- use restate_queue:: SegmentQueue ;
33- use restate_timer_queue:: TimerQueue ;
34- use restate_types:: config:: { InvokerOptions , ServiceClientOptions } ;
35- use restate_types:: identifiers:: PartitionLeaderEpoch ;
36- use restate_types:: identifiers:: { DeploymentId , InvocationId , PartitionKey , WithPartitionKey } ;
37- use restate_types:: journal:: enriched:: EnrichedRawEntry ;
38- use restate_types:: journal:: { Completion , EntryIndex } ;
39- use restate_types:: live:: { Live , LiveLoad } ;
40- use restate_types:: retries:: RetryPolicy ;
41- use restate_types:: schema:: deployment:: DeploymentResolver ;
42- use status_store:: InvocationStatusStore ;
20+ use std:: borrow:: Cow ;
4321use std:: collections:: { HashMap , HashSet } ;
4422use std:: future:: Future ;
4523use std:: ops:: RangeInclusive ;
4624use std:: path:: PathBuf ;
4725use std:: pin:: Pin ;
4826use std:: time:: SystemTime ;
4927use std:: { cmp, panic} ;
28+
29+ use metrics:: { counter, gauge} ;
5030use tokio:: sync:: mpsc;
5131use tokio:: task:: { AbortHandle , JoinSet } ;
5232use tracing:: { debug, trace} ;
5333use tracing:: { error, instrument} ;
5434
55- use crate :: error:: { InvokerError , SdkInvocationErrorV2 } ;
56- use crate :: metric_definitions:: {
57- INVOKER_ENQUEUE , INVOKER_INVOCATION_TASKS , TASK_OP_COMPLETED , TASK_OP_FAILED , TASK_OP_STARTED ,
58- TASK_OP_SUSPENDED ,
59- } ;
60- use error:: InvokerErrorKind ;
61- pub use input_command:: ChannelStatusReader ;
62- pub use input_command:: InvokerHandle ;
35+ use restate_core:: cancellation_watcher;
36+ use restate_errors:: warn_it;
6337use restate_invoker_api:: invocation_reader:: InvocationReader ;
64- use restate_service_client:: { AssumeRoleCacheMode , ServiceClient } ;
38+ use restate_invoker_api:: {
39+ Effect , EffectKind , EntryEnricher , InvocationErrorReport , InvocationStatusReport ,
40+ InvokeInputJournal ,
41+ } ;
42+ use restate_queue:: SegmentQueue ;
43+ use restate_service_client:: { AssumeRoleCacheMode , HttpError , ServiceClient , ServiceClientError } ;
44+ use restate_timer_queue:: TimerQueue ;
45+ use restate_types:: config:: { InvokerOptions , ServiceClientOptions } ;
6546use restate_types:: deployment:: PinnedDeployment ;
47+ use restate_types:: identifiers:: PartitionLeaderEpoch ;
48+ use restate_types:: identifiers:: { DeploymentId , InvocationId , PartitionKey , WithPartitionKey } ;
6649use restate_types:: invocation:: { InvocationEpoch , InvocationTarget } ;
50+ use restate_types:: journal:: enriched:: EnrichedRawEntry ;
51+ use restate_types:: journal:: { Completion , EntryIndex } ;
6752use restate_types:: journal_v2;
6853use restate_types:: journal_v2:: raw:: {
6954 RawCommand , RawEntry , RawEntryHeader , RawEvent , RawNotification ,
7055} ;
7156use restate_types:: journal_v2:: {
7257 CommandIndex , EntryMetadata , Event , NotificationId , TransientErrorEvent ,
7358} ;
59+ use restate_types:: live:: { Live , LiveLoad } ;
60+ use restate_types:: retries:: RetryPolicy ;
61+ use restate_types:: schema:: deployment:: DeploymentResolver ;
7462use restate_types:: schema:: invocation_target:: InvocationTargetResolver ;
7563use restate_types:: schema:: service:: ServiceMetadataResolver ;
7664use restate_types:: service_protocol:: ServiceProtocolVersion ;
7765
66+ use crate :: error:: { InvokerError , SdkInvocationErrorV2 } ;
67+ use crate :: metric_definitions:: {
68+ INVOKER_DEPLOYMENT_UNREACHABLE_ERRORS , INVOKER_ENQUEUE , INVOKER_INVOCATION_TASKS ,
69+ TASK_OP_COMPLETED , TASK_OP_FAILED , TASK_OP_STARTED , TASK_OP_SUSPENDED ,
70+ } ;
71+ use error:: InvokerErrorKind ;
72+ pub use input_command:: ChannelStatusReader ;
73+ pub use input_command:: InvokerHandle ;
74+ use input_command:: { InputCommand , InvokeCommand } ;
75+ use invocation_state_machine:: InvocationStateMachine ;
76+ use invocation_task:: InvocationTask ;
77+ use invocation_task:: { InvocationTaskOutput , InvocationTaskOutputInner } ;
78+ use metric_definitions:: { INVOKER_PENDING_TASKS , INVOKER_TASKS_IN_FLIGHT } ;
79+ use status_store:: InvocationStatusStore ;
80+
7881#[ derive( Debug , Clone , PartialEq , Eq ) ]
7982pub ( crate ) enum Notification {
8083 Completion ( Completion ) ,
@@ -1090,6 +1093,15 @@ where
10901093 . remove_invocation_with_epoch ( partition, & invocation_id, invocation_epoch)
10911094 {
10921095 debug_assert_eq ! ( invocation_epoch, ism. invocation_epoch) ;
1096+
1097+ if self . is_service_down_error ( & error. kind ) {
1098+ let deployment_id = error
1099+ . deployment_id
1100+ . map ( |id| Cow :: Owned ( id. to_string ( ) ) )
1101+ . unwrap_or_else ( || Cow :: Borrowed ( "unknown" ) ) ;
1102+ counter ! ( INVOKER_DEPLOYMENT_UNREACHABLE_ERRORS , "service" => ism. invocation_target. service_name( ) . to_string( ) , "deployment" => deployment_id) . increment ( 1 ) ;
1103+ }
1104+
10931105 self . handle_error_event ( options, partition, invocation_id, error. kind , ism)
10941106 . await ;
10951107 } else {
@@ -1098,6 +1110,22 @@ where
10981110 }
10991111 }
11001112
1113+ fn is_service_down_error ( & self , error : & InvokerErrorKind ) -> bool {
1114+ if let InvokerErrorKind :: Client ( client_err) = error {
1115+ match client_err. as_ref ( ) {
1116+ ServiceClientError :: Http ( _, HttpError :: Connect ( _) ) => true ,
1117+ ServiceClientError :: Lambda ( _, error) => {
1118+ // service down errors are those which might indicate that the service is down or
1119+ // unreachable.
1120+ error. is_service_down ( )
1121+ }
1122+ _ => false ,
1123+ }
1124+ } else {
1125+ false
1126+ }
1127+ }
1128+
11011129 #[ instrument(
11021130 level = "trace" ,
11031131 skip_all,
@@ -1436,7 +1464,7 @@ mod tests {
14361464 use restate_types:: schema:: invocation_target:: InvocationTargetMetadata ;
14371465 use restate_types:: schema:: service:: { InvocationAttemptOptions , ServiceMetadata } ;
14381466
1439- use crate :: error:: { InvokerErrorKind , SdkInvocationErrorV2 } ;
1467+ use crate :: error:: SdkInvocationErrorV2 ;
14401468 use crate :: quota:: InvokerConcurrencyQuota ;
14411469
14421470 // -- Mocks
0 commit comments