Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions crates/partition-store/src/partition_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ use restate_types::partitions::{CfName, Partition};

use crate::durable_lsn_tracking::{AppliedLsnCollectorFactory, DurableLsnEventListener};
use crate::memory::MemoryBudget;
use crate::snapshots::LocalPartitionSnapshot;
use crate::snapshots::{ArchivedLsn, LocalPartitionSnapshot};

type SmartString = smartstring::SmartString<smartstring::LazyCompact>;

#[derive(Clone)]
pub struct PartitionDb {
meta: Arc<Partition>,
durable_lsn: watch::Sender<Option<Lsn>>,
archived_lsn: watch::Sender<Option<Lsn>>,
archived_lsn: watch::Sender<Option<ArchivedLsn>>,
// Note: Rust will drop the fields in the order they are declared in the struct.
// It's crucial to keep the column family and the database in this exact order.
cf: PartitionBoundCfHandle,
Expand All @@ -48,7 +48,7 @@ pub struct PartitionDb {
impl PartitionDb {
pub(crate) fn new(
meta: Arc<Partition>,
archived_lsn: watch::Sender<Option<Lsn>>,
archived_lsn: watch::Sender<Option<ArchivedLsn>>,
rocksdb: Arc<RocksDb>,
cf: Arc<BoundColumnFamily<'_>>,
) -> Self {
Expand Down Expand Up @@ -94,10 +94,10 @@ impl PartitionDb {
.await
}

pub(crate) fn note_archived_lsn(&self, lsn: Lsn) -> bool {
pub(crate) fn note_archived_lsn(&self, archived_lsn: ArchivedLsn) -> bool {
self.archived_lsn.send_if_modified(|current| {
if current.is_none_or(|c| lsn > c) {
*current = Some(lsn);
if current.as_mut().is_none_or(|c| &archived_lsn > c) {
*current = Some(archived_lsn);
true
} else {
false
Expand All @@ -106,11 +106,11 @@ impl PartitionDb {
}

/// The last (locally) known archived LSN for this partition
pub fn get_archived_lsn(&self) -> Option<Lsn> {
pub fn get_archived_lsn(&self) -> Option<ArchivedLsn> {
*self.archived_lsn.borrow()
}

pub fn watch_archived_lsn(&self) -> watch::Receiver<Option<Lsn>> {
pub fn watch_archived_lsn(&self) -> watch::Receiver<Option<ArchivedLsn>> {
self.archived_lsn.subscribe()
}

Expand Down Expand Up @@ -171,7 +171,7 @@ impl PartitionBoundCfHandle {

pub(crate) struct PartitionCell {
meta: Arc<Partition>,
archived_lsn: watch::Sender<Option<Lsn>>,
archived_lsn: watch::Sender<Option<ArchivedLsn>>,
durable_lsn: RwLock<Option<watch::Sender<Option<Lsn>>>>,
pub(crate) inner: AsyncRwLock<State>,
}
Expand Down
7 changes: 5 additions & 2 deletions crates/partition-store/src/partition_store_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use restate_types::partitions::Partition;
use crate::SnapshotError;
use crate::memory::MemoryController;
use crate::partition_db::{AllDataCf, PartitionCell, PartitionDb, RocksConfigurator};
use crate::snapshots::{LocalPartitionSnapshot, Snapshots};
use crate::snapshots::{ArchivedLsn, LocalPartitionSnapshot, Snapshots};
use crate::{BuildError, OpenError, PartitionStore, SnapshotErrorKind};

const PARTITION_CF_PREFIX: &str = "data-";
Expand Down Expand Up @@ -183,7 +183,10 @@ impl PartitionStoreManager {
self.snapshots.is_repository_configured()
}

pub async fn refresh_latest_archived_lsn(&self, partition_id: PartitionId) -> Option<Lsn> {
pub async fn refresh_latest_archived_lsn(
&self,
partition_id: PartitionId,
) -> Option<ArchivedLsn> {
let db = self.get_partition_db(partition_id).await?;
self.snapshots.refresh_latest_archived_lsn(db).await
}
Expand Down
9 changes: 4 additions & 5 deletions crates/partition-store/src/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ use std::sync::Arc;
use crate::{PartitionDb, PartitionStore, SnapshotError, SnapshotErrorKind};

pub use self::metadata::*;
pub use self::repository::SnapshotRepository;
pub use self::repository::{ArchivedLsn, SnapshotRepository};
pub use self::snapshot_task::*;

use tokio::sync::Semaphore;
use tracing::{debug, instrument, warn};

use restate_types::config::Configuration;
use restate_types::identifiers::{PartitionId, SnapshotId};
use restate_types::logs::{Lsn, SequenceNumber};
use restate_types::logs::Lsn;

#[derive(Clone)]
pub struct Snapshots {
Expand Down Expand Up @@ -85,7 +85,7 @@ impl Snapshots {
})
}

pub async fn refresh_latest_archived_lsn(&self, db: PartitionDb) -> Option<Lsn> {
pub async fn refresh_latest_archived_lsn(&self, db: PartitionDb) -> Option<ArchivedLsn> {
let Some(repository) = &self.repository else {
return None;
};
Expand All @@ -94,10 +94,9 @@ impl Snapshots {
let archived_lsn = repository
.get_latest_archived_lsn(partition_id)
.await
.inspect(|lsn| debug!(?partition_id, "Latest archived LSN: {}", lsn))
.inspect_err(|err| warn!(?partition_id, "Unable to get latest archived LSN: {}", err))
.ok()
.unwrap_or(Lsn::INVALID);
.unwrap_or(ArchivedLsn::None);
db.note_archived_lsn(archived_lsn);
Some(archived_lsn)
}
Expand Down
58 changes: 55 additions & 3 deletions crates/partition-store/src/snapshots/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, SystemTime, SystemTimeError};

use anyhow::{Context, anyhow, bail};
use bytes::BytesMut;
Expand Down Expand Up @@ -152,6 +153,54 @@ impl LatestSnapshot {
}
}

#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
pub enum ArchivedLsn {
None,
Snapshot {
// Ordering is intentional: LSN takes priority over elapsed wall clock time for comparisons
min_applied_lsn: Lsn,
created_at: SystemTime,
},
}

impl ArchivedLsn {
pub fn get_min_applied_lsn(&self) -> Lsn {
match self {
ArchivedLsn::None => Lsn::INVALID,
ArchivedLsn::Snapshot {
min_applied_lsn, ..
} => *min_applied_lsn,
}
}

pub fn get_age(&self) -> Result<Duration, SystemTimeError> {
match self {
ArchivedLsn::None => Ok(Duration::MAX),
ArchivedLsn::Snapshot { created_at, .. } => {
SystemTime::now().duration_since(*created_at)
}
}
}
}

impl From<&LatestSnapshot> for ArchivedLsn {
fn from(latest: &LatestSnapshot) -> Self {
ArchivedLsn::Snapshot {
min_applied_lsn: latest.min_applied_lsn,
created_at: latest.created_at.into(),
}
}
}

impl From<&PartitionSnapshotMetadata> for ArchivedLsn {
fn from(metadata: &PartitionSnapshotMetadata) -> Self {
ArchivedLsn::Snapshot {
min_applied_lsn: metadata.min_applied_lsn,
created_at: metadata.created_at.into(),
}
}
}

struct UniqueSnapshotKey {
lsn: Lsn,
snapshot_id: SnapshotId,
Expand Down Expand Up @@ -543,14 +592,17 @@ impl SnapshotRepository {

/// Retrieve the latest known LSN to be archived to the snapshot repository.
/// Response of `Ok(Lsn::INVALID)` indicates no existing snapshot for the partition.
pub async fn get_latest_archived_lsn(&self, partition_id: PartitionId) -> anyhow::Result<Lsn> {
pub async fn get_latest_archived_lsn(
&self,
partition_id: PartitionId,
) -> anyhow::Result<ArchivedLsn> {
let latest_path = self.get_latest_snapshot_pointer(partition_id);

let latest = match self.object_store.get(&latest_path).await {
Ok(result) => result,
Err(object_store::Error::NotFound { .. }) => {
debug!("Latest snapshot data not found in repository");
return Ok(Lsn::INVALID);
return Ok(ArchivedLsn::None);
}
Err(e) => {
return Err(anyhow::Error::new(e).context(format!(
Expand All @@ -562,7 +614,7 @@ impl SnapshotRepository {
let latest: LatestSnapshot = serde_json::from_slice(&latest.bytes().await?)?;
debug!(partition_id = %partition_id, snapshot_id = %latest.snapshot_id, "Latest snapshot metadata: {:?}", latest);

Ok(latest.min_applied_lsn)
Ok(ArchivedLsn::from(&latest))
}

async fn get_latest_snapshot_metadata_for_update(
Expand Down
3 changes: 2 additions & 1 deletion crates/partition-store/src/snapshots/snapshot_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use super::{
SnapshotFormatVersion, SnapshotRepository,
};
use crate::PartitionStoreManager;
use crate::snapshots::ArchivedLsn;

/// Creates a partition store snapshot along with Restate snapshot metadata.
pub struct SnapshotPartitionTask {
Expand Down Expand Up @@ -91,7 +92,7 @@ impl SnapshotPartitionTask {
.get_partition_db(self.partition_id)
.await
{
db.note_archived_lsn(metadata.min_applied_lsn);
db.note_archived_lsn(ArchivedLsn::from(&metadata));
}

Ok(metadata)
Expand Down
33 changes: 25 additions & 8 deletions crates/types/src/config/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,11 +434,12 @@ impl Default for StorageOptions {
}
}

/// # Snapshot options.
/// # Snapshot options
///
/// Partition store snapshotting settings. At a minimum, set `destination` and
/// `snapshot-interval-num-records` to enable snapshotting. For a complete example, see
/// [Snapshots](https://docs.restate.dev/operate/snapshots).
/// Partition store object-store snapshotting settings. At a minimum, set `destination` to enable
/// manual snapshotting via `restatectl`. Additionally, `snapshot-interval` and
/// `snapshot-interval-num-records` can be used to configure automated periodic snapshots. For a
/// complete example, see [Snapshots](https://docs.restate.dev/operate/snapshots).
#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder)]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
Expand All @@ -448,20 +449,35 @@ impl Default for StorageOptions {
pub struct SnapshotsOptions {
/// # Snapshot destination URL
///
/// Base URL for cluster snapshots. Supports `s3://` and `file://` protocol scheme.
/// Base URL for cluster snapshots. Currently only supports the `s3://` protocol scheme.
/// S3-compatible object stores must support ETag-based conditional writes.
///
/// Default: `None`
pub destination: Option<String>,

/// # Automatic snapshot creation frequency
/// # Automatic snapshot time interval
///
/// A time interval at which partition snapshots will be created. If
/// `snapshot-interval-num-records` is also set, it will be treated as an additional requirement
/// before a snapshot is taken. Use both time-based and record-based intervals to reduce the
/// number of snapshots created during times of low activity.
///
/// Snapshot intervals are calculated based on the wall clock timestamps reported by cluster
/// nodes, assuming a basic level of clock synchronization within the cluster.
///
/// This setting does not influence explicitly requested snapshots triggered using `restatectl`.
///
/// Default: `None` - automatic snapshots are disabled
#[serde(skip_serializing_if = "Option::is_none", default)]
pub snapshot_interval: Option<FriendlyDuration>,

/// # Automatic snapshot minimum records
///
/// Number of log records that trigger a snapshot to be created.
///
/// As snapshots are created asynchronously, the actual number of new records that will trigger
/// a snapshot will vary. The counter for the subsequent snapshot begins from the LSN at which
/// the previous snapshot export was initiated. Only leader Partition Processors will take
/// snapshots for a given partition.
/// the previous snapshot export was initiated.
///
/// This setting does not influence explicitly requested snapshots triggered using `restatectl`.
///
Expand All @@ -481,6 +497,7 @@ impl Default for SnapshotsOptions {
fn default() -> Self {
Self {
destination: None,
snapshot_interval: None,
snapshot_interval_num_records: None,
object_store: Default::default(),
object_store_retry_policy: Self::default_retry_policy(),
Expand Down
3 changes: 2 additions & 1 deletion crates/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ impl Worker {
SubscriptionControllerHandle::new(ingress_kafka.create_command_sender());

let snapshots_options = &config.worker.snapshots;
if snapshots_options.snapshot_interval_num_records.is_some()
if (snapshots_options.snapshot_interval.is_some()
|| snapshots_options.snapshot_interval_num_records.is_some())
&& snapshots_options.destination.is_none()
{
return Err(BuildError::SnapshotRepository(anyhow::anyhow!(
Expand Down
9 changes: 6 additions & 3 deletions crates/worker/src/partition/leadership/durability_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::task::Poll;
use std::time::Duration;

use futures::{Stream, StreamExt};
use restate_partition_store::snapshots::ArchivedLsn;
use tokio::sync::watch;
use tokio::time::{Instant, MissedTickBehavior};
use tokio_stream::wrappers::{IntervalStream, WatchStream};
Expand All @@ -39,7 +40,7 @@ pub struct DurabilityTracker {
partition_id: PartitionId,
last_reported_durable_lsn: Lsn,
replica_set_states: PartitionReplicaSetStates,
archived_lsn_watch: WatchStream<Option<Lsn>>,
archived_lsn_watch: WatchStream<Option<ArchivedLsn>>,
check_timer: IntervalStream,
last_warning_at: Instant,
/// cache of the last archived_lsn
Expand All @@ -52,7 +53,7 @@ impl DurabilityTracker {
partition_id: PartitionId,
last_reported_durable_lsn: Option<Lsn>,
replica_set_states: PartitionReplicaSetStates,
archived_lsn_watch: watch::Receiver<Option<Lsn>>,
archived_lsn_watch: watch::Receiver<Option<ArchivedLsn>>,
check_interval: Duration,
) -> Self {
let mut check_timer =
Expand Down Expand Up @@ -166,7 +167,9 @@ impl Stream for DurabilityTracker {
self.terminated = true;
return Poll::Ready(None);
}
(Poll::Ready(Some(archived)), _) => archived.unwrap_or(Lsn::INVALID),
(Poll::Ready(Some(archived)), _) => archived
.map(|a| a.get_min_applied_lsn())
.unwrap_or(Lsn::INVALID),
(_, Poll::Ready(_)) => self.last_archived,
(Poll::Pending, Poll::Pending) => return Poll::Pending,
};
Expand Down
Loading
Loading