Skip to content

Commit a03e298

Browse files
committed
[storage] Draft for Envelope v2
# Summary - Separate header, keys, and other envelope related information from the actual payload of the envelope - Ability to decode envelope header without decoding the full envelope - Support different encoding for the payload (records) (flexbuffers, or bilrost) - Delay decoding of payload until needed # TODO: - [ ] Decode v1 envelope into v2 - [ ] Use Envelope v2 in code
1 parent 9f7b7c5 commit a03e298

File tree

14 files changed

+1062
-25
lines changed

14 files changed

+1062
-25
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/encoding/src/bilrost_encodings/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
1313
mod arc_encodings;
1414
mod nonzero;
15+
mod phantom_data;
1516
mod range;
1617

1718
pub mod display_from_str;
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
2+
// All rights reserved.
3+
//
4+
// Use of this software is governed by the Business Source License
5+
// included in the LICENSE file.
6+
//
7+
// As of the Change Date specified in that file, in accordance with
8+
// the Business Source License, use of this software will be governed
9+
// by the Apache License, Version 2.0.
10+
11+
use std::marker::PhantomData;
12+
13+
use bilrost::{
14+
DecodeErrorKind,
15+
encoding::{EmptyState, ForOverwrite, Proxiable},
16+
};
17+
18+
use crate::bilrost_encodings::RestateEncoding;
19+
20+
struct PhantomDataTag;
21+
22+
impl<T> Proxiable<PhantomDataTag> for PhantomData<T> {
23+
type Proxy = ();
24+
25+
fn encode_proxy(&self) -> Self::Proxy {}
26+
27+
fn decode_proxy(&mut self, _: Self::Proxy) -> Result<(), DecodeErrorKind> {
28+
Ok(())
29+
}
30+
}
31+
32+
impl<T> ForOverwrite<RestateEncoding, PhantomData<T>> for () {
33+
fn for_overwrite() -> PhantomData<T> {
34+
PhantomData
35+
}
36+
}
37+
38+
impl<T> EmptyState<RestateEncoding, PhantomData<T>> for () {
39+
fn empty() -> PhantomData<T> {
40+
PhantomData
41+
}
42+
43+
fn is_empty(_: &PhantomData<T>) -> bool {
44+
true
45+
}
46+
47+
fn clear(_: &mut PhantomData<T>) {}
48+
}
49+
50+
bilrost::delegate_proxied_encoding!(
51+
use encoding (::bilrost::encoding::General)
52+
to encode proxied type (PhantomData<T>)
53+
using proxy tag (PhantomDataTag)
54+
with encoding (RestateEncoding)
55+
with generics (T)
56+
);

crates/invoker-api/src/effects.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,17 @@
1111
use crate::EffectKind::JournalEntryV2;
1212
use restate_types::deployment::PinnedDeployment;
1313
use restate_types::errors::InvocationError;
14-
use restate_types::identifiers::InvocationId;
14+
use restate_types::identifiers::{InvocationId, WithPartitionKey};
1515
use restate_types::invocation::InvocationEpoch;
1616
use restate_types::journal::EntryIndex;
1717
use restate_types::journal::enriched::EnrichedRawEntry;
1818
use restate_types::journal_events::raw::RawEvent;
19-
use restate_types::journal_v2;
2019
use restate_types::journal_v2::CommandIndex;
2120
use restate_types::journal_v2::raw::RawEntry;
21+
use restate_types::logs::{HasRecordKeys, Keys};
2222
use restate_types::storage::{StoredRawEntry, StoredRawEntryHeader};
2323
use restate_types::time::MillisSinceEpoch;
24+
use restate_types::{flexbuffers_storage_encode_decode, journal_v2};
2425
use std::collections::HashSet;
2526

2627
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -35,6 +36,14 @@ pub struct Effect {
3536
pub kind: EffectKind,
3637
}
3738

39+
flexbuffers_storage_encode_decode!(Effect);
40+
41+
impl HasRecordKeys for Effect {
42+
fn record_keys(&self) -> restate_types::logs::Keys {
43+
Keys::Single(self.invocation_id.partition_key())
44+
}
45+
}
46+
3847
#[derive(Debug, Clone, PartialEq, Eq)]
3948
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
4049
// todo: fix this and box the large variant (EffectKind is 320 bytes)

crates/types/src/identifiers.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -349,15 +349,26 @@ impl From<InvocationUuid> for opentelemetry::trace::SpanId {
349349
/// Services are isolated by key. This means that there cannot be two concurrent
350350
/// invocations for the same service instance (service name, key).
351351
#[derive(
352-
Eq, Hash, PartialEq, PartialOrd, Ord, Clone, Debug, serde::Serialize, serde::Deserialize,
352+
Eq,
353+
Hash,
354+
PartialEq,
355+
PartialOrd,
356+
Ord,
357+
Clone,
358+
Debug,
359+
serde::Serialize,
360+
serde::Deserialize,
361+
bilrost::Message,
353362
)]
354363
pub struct ServiceId {
355364
// TODO rename this to KeyedServiceId. This type can be used only by keyed service types (virtual objects and workflows)
356365
/// Identifies the grpc service
366+
#[bilrost(1)]
357367
pub service_name: ByteString,
358368
/// Identifies the service instance for the given service name
369+
#[bilrost(2)]
359370
pub key: ByteString,
360-
371+
#[bilrost(3)]
361372
partition_key: PartitionKey,
362373
}
363374

crates/types/src/invocation/mod.rs

Lines changed: 89 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,25 +12,30 @@
1212
1313
pub mod client;
1414

15+
use std::borrow::Cow;
16+
use std::hash::Hash;
17+
use std::ops::Deref;
18+
use std::str::FromStr;
19+
use std::time::Duration;
20+
use std::{cmp, fmt};
21+
22+
use bytes::Bytes;
23+
use bytestring::ByteString;
24+
use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceState};
25+
use serde_with::{DisplayFromStr, FromInto, serde_as};
26+
1527
use crate::errors::InvocationError;
1628
use crate::identifiers::{
1729
DeploymentId, EntryIndex, IdempotencyId, InvocationId, PartitionKey,
1830
PartitionProcessorRpcRequestId, ServiceId, SubscriptionId, WithInvocationId, WithPartitionKey,
1931
};
2032
use crate::journal_v2::{CompletionId, GetInvocationOutputResult, Signal};
33+
use crate::logs::{HasRecordKeys, Keys};
2134
use crate::time::MillisSinceEpoch;
22-
use crate::{GenerationalNodeId, RestateVersion};
23-
24-
use bytes::Bytes;
25-
use bytestring::ByteString;
26-
use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceState};
27-
use serde_with::{DisplayFromStr, FromInto, serde_as};
28-
use std::borrow::Cow;
29-
use std::hash::Hash;
30-
use std::ops::Deref;
31-
use std::str::FromStr;
32-
use std::time::Duration;
33-
use std::{cmp, fmt};
35+
use crate::{
36+
GenerationalNodeId, RestateVersion, bilrost_storage_encode_decode,
37+
flexbuffers_storage_encode_decode,
38+
};
3439

3540
// Re-exporting opentelemetry [`TraceId`] to avoid having to import opentelemetry in all crates.
3641
pub use opentelemetry::trace::TraceId;
@@ -435,6 +440,14 @@ pub struct ServiceInvocation {
435440
pub restate_version: RestateVersion,
436441
}
437442

443+
flexbuffers_storage_encode_decode!(ServiceInvocation);
444+
445+
impl HasRecordKeys for ServiceInvocation {
446+
fn record_keys(&self) -> Keys {
447+
Keys::Single(self.invocation_id.partition_key())
448+
}
449+
}
450+
438451
#[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
439452
#[serde(
440453
from = "serde_hacks::SubmitNotificationSink",
@@ -577,12 +590,20 @@ pub struct InvocationResponse {
577590
pub result: ResponseResult,
578591
}
579592

593+
flexbuffers_storage_encode_decode!(InvocationResponse);
594+
580595
impl WithInvocationId for InvocationResponse {
581596
fn invocation_id(&self) -> InvocationId {
582597
self.target.invocation_id()
583598
}
584599
}
585600

601+
impl HasRecordKeys for InvocationResponse {
602+
fn record_keys(&self) -> Keys {
603+
Keys::Single(self.partition_key())
604+
}
605+
}
606+
586607
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
587608
pub enum ResponseResult {
588609
Success(Bytes),
@@ -623,12 +644,20 @@ pub struct GetInvocationOutputResponse {
623644
pub result: GetInvocationOutputResult,
624645
}
625646

647+
bilrost_storage_encode_decode!(GetInvocationOutputResponse);
648+
626649
impl WithInvocationId for GetInvocationOutputResponse {
627650
fn invocation_id(&self) -> InvocationId {
628651
self.target.invocation_id()
629652
}
630653
}
631654

655+
impl HasRecordKeys for GetInvocationOutputResponse {
656+
fn record_keys(&self) -> Keys {
657+
Keys::Single(self.partition_key())
658+
}
659+
}
660+
632661
/// Definition of the sink where to send the result of a service invocation.
633662
#[derive(Debug, PartialEq, Eq, Clone, Hash, serde::Serialize, serde::Deserialize)]
634663
#[serde(
@@ -944,6 +973,14 @@ pub struct InvocationTermination {
944973
pub response_sink: Option<InvocationMutationResponseSink>,
945974
}
946975

976+
flexbuffers_storage_encode_decode!(InvocationTermination);
977+
978+
impl HasRecordKeys for InvocationTermination {
979+
fn record_keys(&self) -> crate::logs::Keys {
980+
Keys::Single(self.invocation_id.partition_key())
981+
}
982+
}
983+
947984
/// Flavor of the termination. Can be kill (hard stop) or graceful cancel.
948985
#[derive(
949986
Debug, Clone, Copy, Eq, PartialEq, serde::Serialize, serde::Deserialize, bilrost::Enumeration,
@@ -963,12 +1000,20 @@ pub struct PurgeInvocationRequest {
9631000
pub response_sink: Option<InvocationMutationResponseSink>,
9641001
}
9651002

1003+
flexbuffers_storage_encode_decode!(PurgeInvocationRequest);
1004+
9661005
impl WithInvocationId for PurgeInvocationRequest {
9671006
fn invocation_id(&self) -> InvocationId {
9681007
self.invocation_id
9691008
}
9701009
}
9711010

1011+
impl HasRecordKeys for PurgeInvocationRequest {
1012+
fn record_keys(&self) -> Keys {
1013+
Keys::Single(self.invocation_id.partition_key())
1014+
}
1015+
}
1016+
9721017
/// Message to resume an invocation.
9731018
#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
9741019
pub struct ResumeInvocationRequest {
@@ -979,12 +1024,20 @@ pub struct ResumeInvocationRequest {
9791024
pub response_sink: Option<InvocationMutationResponseSink>,
9801025
}
9811026

1027+
flexbuffers_storage_encode_decode!(ResumeInvocationRequest);
1028+
9821029
impl WithInvocationId for ResumeInvocationRequest {
9831030
fn invocation_id(&self) -> InvocationId {
9841031
self.invocation_id
9851032
}
9861033
}
9871034

1035+
impl HasRecordKeys for ResumeInvocationRequest {
1036+
fn record_keys(&self) -> Keys {
1037+
Keys::Single(self.invocation_id.partition_key())
1038+
}
1039+
}
1040+
9881041
/// Message to restart an invocation.
9891042
#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
9901043
pub struct RestartAsNewInvocationRequest {
@@ -1001,12 +1054,20 @@ pub struct RestartAsNewInvocationRequest {
10011054
pub response_sink: Option<InvocationMutationResponseSink>,
10021055
}
10031056

1057+
flexbuffers_storage_encode_decode!(RestartAsNewInvocationRequest);
1058+
10041059
impl WithInvocationId for RestartAsNewInvocationRequest {
10051060
fn invocation_id(&self) -> InvocationId {
10061061
self.invocation_id
10071062
}
10081063
}
10091064

1065+
impl HasRecordKeys for RestartAsNewInvocationRequest {
1066+
fn record_keys(&self) -> Keys {
1067+
Keys::Single(self.invocation_id.partition_key())
1068+
}
1069+
}
1070+
10101071
// We use this struct instead of SpanContext as it is serialisable and allows us to use TraceStateDef
10111072
#[serde_as]
10121073
#[derive(Debug, PartialEq, Eq, Clone, serde::Serialize, serde::Deserialize)]
@@ -1320,25 +1381,41 @@ pub struct AttachInvocationRequest {
13201381
pub response_sink: ServiceInvocationResponseSink,
13211382
}
13221383

1384+
flexbuffers_storage_encode_decode!(AttachInvocationRequest);
1385+
13231386
impl WithPartitionKey for AttachInvocationRequest {
13241387
fn partition_key(&self) -> PartitionKey {
13251388
self.invocation_query.partition_key()
13261389
}
13271390
}
13281391

1392+
impl HasRecordKeys for AttachInvocationRequest {
1393+
fn record_keys(&self) -> Keys {
1394+
Keys::Single(self.partition_key())
1395+
}
1396+
}
1397+
13291398
/// Represents a request to notify a signal to an invocation
13301399
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
13311400
pub struct NotifySignalRequest {
13321401
pub invocation_id: InvocationId,
13331402
pub signal: Signal,
13341403
}
13351404

1405+
flexbuffers_storage_encode_decode!(NotifySignalRequest);
1406+
13361407
impl WithInvocationId for NotifySignalRequest {
13371408
fn invocation_id(&self) -> InvocationId {
13381409
self.invocation_id
13391410
}
13401411
}
13411412

1413+
impl HasRecordKeys for NotifySignalRequest {
1414+
fn record_keys(&self) -> Keys {
1415+
Keys::Single(self.partition_key())
1416+
}
1417+
}
1418+
13421419
/// The invocation epoch represents the restarts count of the invocation, as seen from the Partition processor.
13431420
pub type InvocationEpoch = u32;
13441421

crates/types/src/message.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
//! This module defines types used for the internal messaging between Restate components.
1212
13-
use crate::identifiers::PartitionId;
13+
use crate::{bilrost_storage_encode_decode, identifiers::PartitionId};
1414

1515
/// Wrapper that extends a message with its target peer to which the message should be sent.
1616
pub type PartitionTarget<Msg> = (PartitionId, Msg);
@@ -29,3 +29,11 @@ pub enum AckKind {
2929
last_known_seq_number: MessageIndex,
3030
},
3131
}
32+
33+
#[derive(Debug, Clone, Copy, bilrost::Message)]
34+
pub struct TruncateOutboxRequest {
35+
#[bilrost(1)]
36+
pub index: MessageIndex,
37+
}
38+
39+
bilrost_storage_encode_decode!(TruncateOutboxRequest);

0 commit comments

Comments
 (0)