From 5cd6c25c1a02455b8c8ae6ded4a36429361baded Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 14 Oct 2025 12:39:35 +0200 Subject: [PATCH] Introduce Service Protocol V7. This introduces AwaitingMessage, a notification from SDK to server of which event the sdk might be awaiting on. --- .github/workflows/ci.yml | 17 +++++++++ .../invoker-impl/src/invocation_task/mod.rs | 25 +++++++++---- .../service_protocol_runner_v4.rs | 37 +++++++++++++++++-- crates/invoker-impl/src/lib.rs | 1 + .../src/message_codec/mod.rs | 1 + .../dev/restate/service/protocol.proto | 3 ++ crates/types/src/config/worker.rs | 9 +++++ crates/types/src/service_protocol.rs | 14 +++++-- .../dev/restate/service/protocol.proto | 17 ++++++++- 9 files changed, 109 insertions(+), 15 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4bb5ffef97..62df6925b2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -166,6 +166,23 @@ jobs: with: restateCommit: ${{ github.event.pull_request.head.sha || github.sha }} + sdk-typescript-protocol-v7: + name: Run SDK-Typescript integration tests with Protocol V7 + permissions: + contents: read + issues: read + checks: write + pull-requests: write + actions: read + secrets: inherit + needs: docker + uses: restatedev/sdk-typescript/.github/workflows/integration.yaml@main + with: + restateCommit: ${{ github.event.pull_request.head.sha || github.sha }} + envVars: | + RESTATE_WORKER__INVOKER__experimental_features_allow_protocol_v7=true + testArtifactOutput: sdk-typescript-protocol-v7 + sdk-rust: name: Run SDK-Rust integration tests permissions: diff --git a/crates/invoker-impl/src/invocation_task/mod.rs b/crates/invoker-impl/src/invocation_task/mod.rs index b251afedcf..5e4cdaabe9 100644 --- a/crates/invoker-impl/src/invocation_task/mod.rs +++ b/crates/invoker-impl/src/invocation_task/mod.rs @@ -80,6 +80,10 @@ const SERVICE_PROTOCOL_VERSION_V5: HeaderValue = const SERVICE_PROTOCOL_VERSION_V6: HeaderValue = HeaderValue::from_static("application/vnd.restate.invocation.v6"); +#[allow(clippy::declare_interior_mutable_const)] +const SERVICE_PROTOCOL_VERSION_V7: HeaderValue = + HeaderValue::from_static("application/vnd.restate.invocation.v7"); + #[allow(clippy::declare_interior_mutable_const)] const X_RESTATE_SERVER: HeaderName = HeaderName::from_static("x-restate-server"); @@ -150,6 +154,7 @@ pub(super) struct InvocationTask { message_size_warning: usize, message_size_limit: Option, retry_count_since_last_stored_entry: u32, + allow_protocol_v7: bool, // Invoker tx/rx invocation_reader: IR, @@ -218,6 +223,7 @@ where deployment_metadata_resolver: Live, invoker_tx: mpsc::UnboundedSender, invoker_rx: mpsc::UnboundedReceiver, + allow_protocol_v7: bool, action_token_bucket: Option, ) -> Self { Self { @@ -237,6 +243,7 @@ where message_size_limit, message_size_warning, retry_count_since_last_stored_entry, + allow_protocol_v7, action_token_bucket, } } @@ -346,13 +353,16 @@ where ); let chosen_service_protocol_version = shortcircuit!( - ServiceProtocolVersion::pick(&deployment.supported_protocol_versions,) - .ok_or_else(|| { - InvokerError::IncompatibleServiceEndpoint( - deployment.id, - deployment.supported_protocol_versions.clone(), - ) - }) + ServiceProtocolVersion::pick( + &deployment.supported_protocol_versions, + self.allow_protocol_v7 + ) + .ok_or_else(|| { + InvokerError::IncompatibleServiceEndpoint( + deployment.id, + deployment.supported_protocol_versions.clone(), + ) + }) ); ( @@ -447,6 +457,7 @@ fn service_protocol_version_to_header_value( ServiceProtocolVersion::V4 => SERVICE_PROTOCOL_VERSION_V4, ServiceProtocolVersion::V5 => SERVICE_PROTOCOL_VERSION_V5, ServiceProtocolVersion::V6 => SERVICE_PROTOCOL_VERSION_V6, + ServiceProtocolVersion::V7 => SERVICE_PROTOCOL_VERSION_V7, } } diff --git a/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs b/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs index e359303542..b771a51209 100644 --- a/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs +++ b/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs @@ -628,6 +628,7 @@ where Message::CommandAck(_) => TerminalLoopState::Failed(InvokerError::UnexpectedMessageV4( MessageType::CommandAck, )), + Message::Awaiting(awaiting) => self.handle_awaiting_message(awaiting), Message::Suspension(suspension) => self.handle_suspension_message(suspension), Message::Error(e) => self.handle_error_message(e), Message::End(_) => TerminalLoopState::Closed, @@ -955,7 +956,7 @@ where &mut self, suspension: proto::SuspensionMessage, ) -> TerminalLoopState<()> { - let suspension_indexes: HashSet<_> = suspension + let suspension_notification_ids: HashSet<_> = suspension .waiting_completions .into_iter() .map(NotificationId::for_completion) @@ -975,10 +976,40 @@ where ) .collect(); // We currently don't support empty suspension_indexes set - if suspension_indexes.is_empty() { + if suspension_notification_ids.is_empty() { return TerminalLoopState::Failed(InvokerError::EmptySuspensionMessage); } - TerminalLoopState::SuspendedV2(suspension_indexes) + TerminalLoopState::SuspendedV2(suspension_notification_ids) + } + + fn handle_awaiting_message( + &mut self, + awaiting: proto::AwaitingMessage, + ) -> TerminalLoopState<()> { + #[allow(unused_variables)] + let awaiting_notification_ids: HashSet<_> = awaiting + .waiting_completions + .into_iter() + .map(NotificationId::for_completion) + .chain( + awaiting + .waiting_signals + .into_iter() + .map(SignalId::for_index) + .map(NotificationId::for_signal), + ) + .chain( + awaiting + .waiting_named_signals + .into_iter() + .map(|s| SignalId::for_name(s.into())) + .map(NotificationId::for_signal), + ) + .collect(); + + // TODO(till) fill here the business logic to deal with the awaiting message + + TerminalLoopState::Continue(()) } fn handle_error_message(&mut self, error: proto::ErrorMessage) -> TerminalLoopState<()> { diff --git a/crates/invoker-impl/src/lib.rs b/crates/invoker-impl/src/lib.rs index f2cd813748..afa479c6c4 100644 --- a/crates/invoker-impl/src/lib.rs +++ b/crates/invoker-impl/src/lib.rs @@ -156,6 +156,7 @@ where self.schemas.clone(), invoker_tx, invoker_rx, + opts.experimental_features_allow_protocol_v7(), self.action_token_bucket.clone(), ) .run(input_journal), diff --git a/crates/service-protocol-v4/src/message_codec/mod.rs b/crates/service-protocol-v4/src/message_codec/mod.rs index 822bca01da..129d82cf8d 100644 --- a/crates/service-protocol-v4/src/message_codec/mod.rs +++ b/crates/service-protocol-v4/src/message_codec/mod.rs @@ -333,6 +333,7 @@ gen_message!( End Control = 0x0003, CommandAck Control = 0x0004, ProposeRunCompletion Control = 0x0005, + Awaiting Control = 0x0006, Input Command noparse allows_ack = 0x0400, Output Command noparse allows_ack = 0x0401, diff --git a/crates/types/service-protocol-v3/dev/restate/service/protocol.proto b/crates/types/service-protocol-v3/dev/restate/service/protocol.proto index 7d92e2fb10..df821b3298 100644 --- a/crates/types/service-protocol-v3/dev/restate/service/protocol.proto +++ b/crates/types/service-protocol-v3/dev/restate/service/protocol.proto @@ -37,6 +37,9 @@ enum ServiceProtocolVersion { // * StartMessage.random_seed // * Failure.metadata V6 = 6; + // Added: + // * AwaitingMessage + V7 = 7; } // --- Core frames --- diff --git a/crates/types/src/config/worker.rs b/crates/types/src/config/worker.rs index 55017cb1d0..0f3d456834 100644 --- a/crates/types/src/config/worker.rs +++ b/crates/types/src/config/worker.rs @@ -288,6 +288,10 @@ pub struct InvokerOptions { #[serde(skip_serializing_if = "std::ops::Not::not", default)] pub disable_eager_state: bool, + #[cfg_attr(feature = "schemars", schemars(skip))] + #[serde(skip_serializing_if = "std::ops::Not::not", default)] + experimental_features_allow_protocol_v7: bool, + /// # Invocation throttling /// /// Configures throttling for service invocations at the node level. @@ -334,6 +338,10 @@ impl InvokerOptions { self.message_size_limit.map(Into::into) } + pub fn experimental_features_allow_protocol_v7(&self) -> bool { + self.experimental_features_allow_protocol_v7 + } + #[allow(deprecated)] pub fn print_deprecation_warnings(&self) { if self.retry_policy.is_some() { @@ -359,6 +367,7 @@ impl Default for InvokerOptions { tmp_dir: None, concurrent_invocations_limit: Some(NonZeroUsize::new(1000).expect("is non zero")), disable_eager_state: false, + experimental_features_allow_protocol_v7: false, invocation_throttling: None, action_throttling: None, } diff --git a/crates/types/src/service_protocol.rs b/crates/types/src/service_protocol.rs index 316c8eb684..18aeef261b 100644 --- a/crates/types/src/service_protocol.rs +++ b/crates/types/src/service_protocol.rs @@ -16,7 +16,7 @@ use std::ops::RangeInclusive; pub const MIN_INFLIGHT_SERVICE_PROTOCOL_VERSION: ServiceProtocolVersion = ServiceProtocolVersion::V1; pub const MAX_INFLIGHT_SERVICE_PROTOCOL_VERSION: ServiceProtocolVersion = - ServiceProtocolVersion::V6; + ServiceProtocolVersion::V7; pub const MIN_DISCOVERABLE_SERVICE_PROTOCOL_VERSION: ServiceProtocolVersion = ServiceProtocolVersion::V5; @@ -45,15 +45,21 @@ impl ServiceProtocolVersion { /// Pick the version to use for running an invocation pub fn pick( deployment_supported_versions: &RangeInclusive, + enable_protocol_v7: bool, ) -> Option { - if *deployment_supported_versions.start() - <= i32::from(MAX_INFLIGHT_SERVICE_PROTOCOL_VERSION) + let max_version = if enable_protocol_v7 { + ServiceProtocolVersion::V7.into() + } else { + ServiceProtocolVersion::V6.into() + }; + + if *deployment_supported_versions.start() <= max_version && *deployment_supported_versions.end() >= i32::from(MIN_INFLIGHT_SERVICE_PROTOCOL_VERSION) { ServiceProtocolVersion::try_from(std::cmp::min( *deployment_supported_versions.end(), - i32::from(MAX_INFLIGHT_SERVICE_PROTOCOL_VERSION), + max_version, )) .ok() } else { diff --git a/service-protocol/dev/restate/service/protocol.proto b/service-protocol/dev/restate/service/protocol.proto index d9423d6e23..89163c0d98 100644 --- a/service-protocol/dev/restate/service/protocol.proto +++ b/service-protocol/dev/restate/service/protocol.proto @@ -37,6 +37,9 @@ enum ServiceProtocolVersion { // * StartMessage.random_seed // * Failure.metadata V6 = 6; + // Added: + // * AwaitingMessage + V7 = 7; } // --- Core frames --- @@ -89,7 +92,7 @@ message StartMessage { // // These lists represent any of the notification_idx and/or notification_name the invocation is waiting on to progress. // The runtime will resume the invocation as soon as either one of the given notification_idx or notification_name is completed. -// Between the two lists there MUST be at least one element. +// Between the 3 lists there MUST be at least one element. message SuspensionMessage { repeated uint32 waiting_completions = 1; repeated uint32 waiting_signals = 2; @@ -143,6 +146,18 @@ message ProposeRunCompletionMessage { }; } +// Type: 0x0000 + 6 +// Implementations MAY send this message when awaiting an invocation. +// This is not an end of stream message, it just notifies the runtime what the SDK is waiting on. +// Actual suspension is represented through SuspensionMessage. +// +// Between the 3 lists there MUST be at least one element. +message AwaitingMessage { + repeated uint32 waiting_completions = 1; + repeated uint32 waiting_signals = 2; + repeated string waiting_named_signals = 3; +} + // --- Commands and Notifications --- // The Journal is modelled as commands and notifications.