Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,23 @@ jobs:
with:
restateCommit: ${{ github.event.pull_request.head.sha || github.sha }}

sdk-typescript-protocol-v7:
name: Run SDK-Typescript integration tests with Protocol V7
permissions:
contents: read
issues: read
checks: write
pull-requests: write
actions: read
secrets: inherit
needs: docker
uses: restatedev/sdk-typescript/.github/workflows/integration.yaml@main
with:
restateCommit: ${{ github.event.pull_request.head.sha || github.sha }}
envVars: |
RESTATE_WORKER__INVOKER__experimental_features_allow_protocol_v7=true
testArtifactOutput: sdk-typescript-protocol-v7

sdk-rust:
name: Run SDK-Rust integration tests
permissions:
Expand Down
25 changes: 18 additions & 7 deletions crates/invoker-impl/src/invocation_task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ const SERVICE_PROTOCOL_VERSION_V5: HeaderValue =
const SERVICE_PROTOCOL_VERSION_V6: HeaderValue =
HeaderValue::from_static("application/vnd.restate.invocation.v6");

#[allow(clippy::declare_interior_mutable_const)]
const SERVICE_PROTOCOL_VERSION_V7: HeaderValue =
HeaderValue::from_static("application/vnd.restate.invocation.v7");

#[allow(clippy::declare_interior_mutable_const)]
const X_RESTATE_SERVER: HeaderName = HeaderName::from_static("x-restate-server");

Expand Down Expand Up @@ -150,6 +154,7 @@ pub(super) struct InvocationTask<IR, EE, DMR> {
message_size_warning: usize,
message_size_limit: Option<usize>,
retry_count_since_last_stored_entry: u32,
allow_protocol_v7: bool,

// Invoker tx/rx
invocation_reader: IR,
Expand Down Expand Up @@ -218,6 +223,7 @@ where
deployment_metadata_resolver: Live<Schemas>,
invoker_tx: mpsc::UnboundedSender<InvocationTaskOutput>,
invoker_rx: mpsc::UnboundedReceiver<Notification>,
allow_protocol_v7: bool,
action_token_bucket: Option<TokenBucket>,
) -> Self {
Self {
Expand All @@ -237,6 +243,7 @@ where
message_size_limit,
message_size_warning,
retry_count_since_last_stored_entry,
allow_protocol_v7,
action_token_bucket,
}
}
Expand Down Expand Up @@ -346,13 +353,16 @@ where
);

let chosen_service_protocol_version = shortcircuit!(
ServiceProtocolVersion::pick(&deployment.supported_protocol_versions,)
.ok_or_else(|| {
InvokerError::IncompatibleServiceEndpoint(
deployment.id,
deployment.supported_protocol_versions.clone(),
)
})
ServiceProtocolVersion::pick(
&deployment.supported_protocol_versions,
self.allow_protocol_v7
)
.ok_or_else(|| {
InvokerError::IncompatibleServiceEndpoint(
deployment.id,
deployment.supported_protocol_versions.clone(),
)
})
);

(
Expand Down Expand Up @@ -447,6 +457,7 @@ fn service_protocol_version_to_header_value(
ServiceProtocolVersion::V4 => SERVICE_PROTOCOL_VERSION_V4,
ServiceProtocolVersion::V5 => SERVICE_PROTOCOL_VERSION_V5,
ServiceProtocolVersion::V6 => SERVICE_PROTOCOL_VERSION_V6,
ServiceProtocolVersion::V7 => SERVICE_PROTOCOL_VERSION_V7,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,7 @@ where
Message::CommandAck(_) => TerminalLoopState::Failed(InvokerError::UnexpectedMessageV4(
MessageType::CommandAck,
)),
Message::Awaiting(awaiting) => self.handle_awaiting_message(awaiting),
Message::Suspension(suspension) => self.handle_suspension_message(suspension),
Message::Error(e) => self.handle_error_message(e),
Message::End(_) => TerminalLoopState::Closed,
Expand Down Expand Up @@ -955,7 +956,7 @@ where
&mut self,
suspension: proto::SuspensionMessage,
) -> TerminalLoopState<()> {
let suspension_indexes: HashSet<_> = suspension
let suspension_notification_ids: HashSet<_> = suspension
.waiting_completions
.into_iter()
.map(NotificationId::for_completion)
Expand All @@ -975,10 +976,40 @@ where
)
.collect();
// We currently don't support empty suspension_indexes set
if suspension_indexes.is_empty() {
if suspension_notification_ids.is_empty() {
return TerminalLoopState::Failed(InvokerError::EmptySuspensionMessage);
}
TerminalLoopState::SuspendedV2(suspension_indexes)
TerminalLoopState::SuspendedV2(suspension_notification_ids)
}

fn handle_awaiting_message(
&mut self,
awaiting: proto::AwaitingMessage,
) -> TerminalLoopState<()> {
#[allow(unused_variables)]
let awaiting_notification_ids: HashSet<_> = awaiting
.waiting_completions
.into_iter()
.map(NotificationId::for_completion)
.chain(
awaiting
.waiting_signals
.into_iter()
.map(SignalId::for_index)
.map(NotificationId::for_signal),
)
.chain(
awaiting
.waiting_named_signals
.into_iter()
.map(|s| SignalId::for_name(s.into()))
.map(NotificationId::for_signal),
)
.collect();

// TODO(till) fill here the business logic to deal with the awaiting message

TerminalLoopState::Continue(())
}

fn handle_error_message(&mut self, error: proto::ErrorMessage) -> TerminalLoopState<()> {
Expand Down
1 change: 1 addition & 0 deletions crates/invoker-impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ where
self.schemas.clone(),
invoker_tx,
invoker_rx,
opts.experimental_features_allow_protocol_v7(),
self.action_token_bucket.clone(),
)
.run(input_journal),
Expand Down
1 change: 1 addition & 0 deletions crates/service-protocol-v4/src/message_codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ gen_message!(
End Control = 0x0003,
CommandAck Control = 0x0004,
ProposeRunCompletion Control = 0x0005,
Awaiting Control = 0x0006,

Input Command noparse allows_ack = 0x0400,
Output Command noparse allows_ack = 0x0401,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ enum ServiceProtocolVersion {
// * StartMessage.random_seed
// * Failure.metadata
V6 = 6;
// Added:
// * AwaitingMessage
V7 = 7;
}

// --- Core frames ---
Expand Down
9 changes: 9 additions & 0 deletions crates/types/src/config/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ pub struct InvokerOptions {
#[serde(skip_serializing_if = "std::ops::Not::not", default)]
pub disable_eager_state: bool,

#[cfg_attr(feature = "schemars", schemars(skip))]
#[serde(skip_serializing_if = "std::ops::Not::not", default)]
experimental_features_allow_protocol_v7: bool,

/// # Invocation throttling
///
/// Configures throttling for service invocations at the node level.
Expand Down Expand Up @@ -334,6 +338,10 @@ impl InvokerOptions {
self.message_size_limit.map(Into::into)
}

pub fn experimental_features_allow_protocol_v7(&self) -> bool {
self.experimental_features_allow_protocol_v7
}

#[allow(deprecated)]
pub fn print_deprecation_warnings(&self) {
if self.retry_policy.is_some() {
Expand All @@ -359,6 +367,7 @@ impl Default for InvokerOptions {
tmp_dir: None,
concurrent_invocations_limit: Some(NonZeroUsize::new(1000).expect("is non zero")),
disable_eager_state: false,
experimental_features_allow_protocol_v7: false,
invocation_throttling: None,
action_throttling: None,
}
Expand Down
14 changes: 10 additions & 4 deletions crates/types/src/service_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::ops::RangeInclusive;
pub const MIN_INFLIGHT_SERVICE_PROTOCOL_VERSION: ServiceProtocolVersion =
ServiceProtocolVersion::V1;
pub const MAX_INFLIGHT_SERVICE_PROTOCOL_VERSION: ServiceProtocolVersion =
ServiceProtocolVersion::V6;
ServiceProtocolVersion::V7;

pub const MIN_DISCOVERABLE_SERVICE_PROTOCOL_VERSION: ServiceProtocolVersion =
ServiceProtocolVersion::V5;
Expand Down Expand Up @@ -45,15 +45,21 @@ impl ServiceProtocolVersion {
/// Pick the version to use for running an invocation
pub fn pick(
deployment_supported_versions: &RangeInclusive<i32>,
enable_protocol_v7: bool,
) -> Option<ServiceProtocolVersion> {
if *deployment_supported_versions.start()
<= i32::from(MAX_INFLIGHT_SERVICE_PROTOCOL_VERSION)
let max_version = if enable_protocol_v7 {
ServiceProtocolVersion::V7.into()
} else {
ServiceProtocolVersion::V6.into()
};

if *deployment_supported_versions.start() <= max_version
&& *deployment_supported_versions.end()
>= i32::from(MIN_INFLIGHT_SERVICE_PROTOCOL_VERSION)
{
ServiceProtocolVersion::try_from(std::cmp::min(
*deployment_supported_versions.end(),
i32::from(MAX_INFLIGHT_SERVICE_PROTOCOL_VERSION),
max_version,
))
.ok()
} else {
Expand Down
17 changes: 16 additions & 1 deletion service-protocol/dev/restate/service/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ enum ServiceProtocolVersion {
// * StartMessage.random_seed
// * Failure.metadata
V6 = 6;
// Added:
// * AwaitingMessage
V7 = 7;
}

// --- Core frames ---
Expand Down Expand Up @@ -89,7 +92,7 @@ message StartMessage {
//
// These lists represent any of the notification_idx and/or notification_name the invocation is waiting on to progress.
// The runtime will resume the invocation as soon as either one of the given notification_idx or notification_name is completed.
// Between the two lists there MUST be at least one element.
// Between the 3 lists there MUST be at least one element.
message SuspensionMessage {
repeated uint32 waiting_completions = 1;
repeated uint32 waiting_signals = 2;
Expand Down Expand Up @@ -143,6 +146,18 @@ message ProposeRunCompletionMessage {
};
}

// Type: 0x0000 + 6
// Implementations MAY send this message when awaiting an invocation.
// This is not an end of stream message, it just notifies the runtime what the SDK is waiting on.
// Actual suspension is represented through SuspensionMessage.
//
// Between the 3 lists there MUST be at least one element.
message AwaitingMessage {
repeated uint32 waiting_completions = 1;
repeated uint32 waiting_signals = 2;
repeated string waiting_named_signals = 3;
}

// --- Commands and Notifications ---

// The Journal is modelled as commands and notifications.
Expand Down
Loading