Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
4bf7a29
Merge pull request #223 from JanKaul/more-context-for-unimplemented
JanKaul Aug 28, 2025
8b32c4e
Merge pull request #223 from JanKaul/more-context-for-unimplemented
JanKaul Aug 1, 2025
8ada662
use separate manifest files for delete files
Aug 5, 2025
34fd726
append manifest only if there are data or delete files
Aug 5, 2025
e07ec6a
calculate the delete splits
Aug 5, 2025
d0e6588
fix metadata
Aug 5, 2025
ef2694f
track number of written manifests
Aug 6, 2025
55631ba
fix selected_manifest
Aug 6, 2025
101ccf3
fix remaining selected manifest
Aug 6, 2025
9b351f7
update docs
Aug 6, 2025
923b8ac
refactor appends
Aug 6, 2025
9650d89
Revert "provide more context for unimplemented macro"
JanKaul Aug 1, 2025
7a15d14
save disk space
Aug 6, 2025
356b4c3
Merge pull request #224 from JanKaul/revert-223-more-context-for-unim…
JanKaul Aug 1, 2025
fe916da
fix clippy warnings
JanKaul Aug 11, 2025
29953c3
fix: make version-hint.text compatible with other readers
gruuya Aug 7, 2025
7b75213
Merge pull request #228 from JanKaul/fix-equalitly-deletes
JanKaul Aug 6, 2025
ff908db
cargo fmt
JanKaul Aug 11, 2025
787b8d3
Merge pull request #234 from splitgraph/fix-version-hint-content
JanKaul Aug 11, 2025
f3060cc
fix: conflict when projecting a field not present in equality deletes
gruuya Aug 11, 2025
8109b67
Merge pull request #240 from JanKaul/fix-clippy
JanKaul Aug 12, 2025
352bb68
fix: update last-updated-ms field when perfming a table update
gruuya Aug 20, 2025
fdb47cc
Merge pull request #239 from gruuya/eq-del-projection-conflict
JanKaul Aug 12, 2025
16814ca
fix: use correct number of splits for deletes
gruuya Aug 26, 2025
645f003
Merge pull request #244 from gruuya/last-updated-ms-fix
JanKaul Aug 20, 2025
b4ae858
AppendSequenceGroups operation
SergeiPatiakin Aug 26, 2025
7de9c67
Merge pull request #245 from gruuya/fix-n-splits
JanKaul Aug 27, 2025
69cf64c
docs: default compression level is 3
satlank Aug 27, 2025
a5c4d09
Merge pull request #251 from splitgraph/new-dsn-2
JanKaul Aug 27, 2025
851d21d
Merge pull request #250 from satlank/fixDoc
JanKaul Aug 27, 2025
4062222
fix version hint test
JanKaul Aug 28, 2025
97fa490
feat: add snapshot expiration functionality and maintenance operations
ForeverAngry Aug 29, 2025
ef36b0f
fix: update expire snapshots implementation to work with current iceb…
ForeverAngry Sep 27, 2025
e4dc34c
Merge upstream changes from origin/main
ForeverAngry Sep 27, 2025
c0ac126
refactor: update snapshot expiration implementation and remove legacy…
ForeverAngry Sep 29, 2025
ebb99ff
refactor: remove legacy snapshot expiration implementation and update…
ForeverAngry Oct 10, 2025
400552e
refactor: simplify snapshot expiration implementation and remove buil…
ForeverAngry Nov 25, 2025
7597885
Merge remote-tracking branch 'origin/main' into pr-262
ForeverAngry Nov 25, 2025
77e7a59
fix clippy warnings
JanKaul Nov 29, 2025
c73281a
move tests to end
JanKaul Nov 29, 2025
5801b71
cargo fmt
JanKaul Nov 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ It provides an Iceberg integration for the [Datafusion](https://arrow.apache.org
| Equality deletes | :white_check_mark: |
| Positional deletes | |

### Table Maintenance

| Feature | Status |
| --- | --- |
| Expire snapshots | :white_check_mark: |
| Orphan file cleanup | :white_check_mark: |

### Iceberg Views

| Feature | Status |
Expand Down Expand Up @@ -62,6 +69,8 @@ It provides an Iceberg integration for the [Datafusion](https://arrow.apache.org

Check out the [datafusion examples](datafusion_iceberg/examples).

### Basic Table Operations

```rust
use datafusion::{arrow::array::Int64Array, prelude::SessionContext};
use datafusion_iceberg::DataFusionTable;
Expand Down
14 changes: 14 additions & 0 deletions iceberg-rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//! * Time travel and snapshot isolation
//! * View and materialized view support
//! * Multiple catalog implementations (REST, AWS Glue, File-based)
//! * Table maintenance operations (snapshot expiration, orphan file cleanup)
//!
//! # Components
//!
Expand Down Expand Up @@ -43,6 +44,19 @@
//! .update_schema(new_schema)
//! .commit()
//! .await?;
//!
//! // Expire old snapshots for maintenance
//! table
//! .new_transaction(None)
//! .expire_snapshots(
//! Some(chrono::Utc::now().timestamp_millis() - 30 * 24 * 60 * 60 * 1000),
//! Some(10),
//! true,
//! true,
//! false,
//! )
//! .commit()
//! .await?;
//! # Ok(())
//! # }
//! ```
Expand Down
51 changes: 50 additions & 1 deletion iceberg-rust/src/table/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ pub(crate) static REPLACE_INDEX: usize = 4;
pub(crate) static OVERWRITE_INDEX: usize = 5;
pub(crate) static UPDATE_PROPERTIES_INDEX: usize = 6;
pub(crate) static SET_SNAPSHOT_REF_INDEX: usize = 7;
pub(crate) static EXPIRE_SNAPSHOTS_INDEX: usize = 8;

pub(crate) static NUM_OPERATIONS: usize = 8;
pub(crate) static NUM_OPERATIONS: usize = 9;

/// A transaction that can perform multiple operations on a table atomically
///
Expand Down Expand Up @@ -395,6 +396,54 @@ impl<'table> TableTransaction<'table> {
self.operations[SET_SNAPSHOT_REF_INDEX] = Some(Operation::SetSnapshotRef(entry));
self
}

/// Expire snapshots based on the provided configuration
///
/// This operation expires snapshots according to the retention policies specified.
/// It can expire snapshots older than a certain timestamp, retain only the most recent N snapshots,
/// and optionally clean up orphaned data files.
///
/// # Arguments
/// * `older_than` - Optional timestamp (ms since Unix epoch) to expire snapshots older than this time
/// * `retain_last` - Optional number of most recent snapshots to keep, regardless of timestamp
/// * `clean_orphan_files` - Whether to clean up data files that are no longer referenced
/// * `retain_ref_snapshots` - Whether to preserve snapshots that are referenced by branches/tags
/// * `dry_run` - Whether to perform a dry run without actually deleting anything
///
/// # Returns
/// * `Self` - The transaction builder for method chaining
///
/// # Examples
/// ```
/// let result = table.new_transaction(None)
/// .expire_snapshots(
/// Some(chrono::Utc::now().timestamp_millis() - 7 * 24 * 60 * 60 * 1000),
/// Some(5),
/// true,
/// true,
/// false
/// )
/// .commit()
/// .await?;
/// ```
pub fn expire_snapshots(
mut self,
older_than: Option<i64>,
retain_last: Option<usize>,
clean_orphan_files: bool,
retain_ref_snapshots: bool,
dry_run: bool,
) -> Self {
self.operations[EXPIRE_SNAPSHOTS_INDEX] = Some(Operation::ExpireSnapshots {
older_than,
retain_last,
_clean_orphan_files: clean_orphan_files,
retain_ref_snapshots,
dry_run,
});
self
}

/// Commits all operations in this transaction atomically
///
/// This method executes all operations in the transaction and updates the table
Expand Down
262 changes: 253 additions & 9 deletions iceberg-rust/src/table/transaction/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,21 @@ pub enum Operation {
files_to_overwrite: HashMap<String, Vec<String>>,
additional_summary: Option<HashMap<String, String>>,
}, // /// Remove or replace rows in existing data files
// NewRowDelta,
// /// Delete files in the table and commit
// NewDelete,
// /// Expire snapshots in the table
// ExpireSnapshots,
// /// Manage snapshots in the table
// ManageSnapshots,
// /// Read and write table data and metadata files
// IO,
// NewRowDelta,
// /// Delete files in the table and commit
// NewDelete,
/// Expire snapshots in the table
ExpireSnapshots {
older_than: Option<i64>,
retain_last: Option<usize>,
_clean_orphan_files: bool,
retain_ref_snapshots: bool,
dry_run: bool,
},
// /// Manage snapshots in the table
// ManageSnapshots,
// /// Read and write table data and metadata files
// IO,
}

impl Operation {
Expand Down Expand Up @@ -843,6 +849,105 @@ impl Operation {
debug!("Executing SetDefaultSpec operation: spec_id={}", spec_id);
Ok((None, vec![TableUpdate::SetDefaultSpec { spec_id }]))
}
Operation::ExpireSnapshots {
older_than,
retain_last,
_clean_orphan_files: _,
retain_ref_snapshots,
dry_run,
} => {
debug!("Executing ExpireSnapshots operation");

// Validate parameters
if older_than.is_none() && retain_last.is_none() {
return Err(Error::InvalidFormat(
"Must specify either older_than or retain_last for snapshot expiration"
.into(),
));
}

// Get all snapshots sorted by timestamp (newest first)
let mut all_snapshots: Vec<_> = table_metadata.snapshots.values().collect();
all_snapshots.sort_by(|a, b| b.timestamp_ms().cmp(a.timestamp_ms()));

// Get current snapshot ID to ensure we never expire it
let current_snapshot_id = table_metadata.current_snapshot_id;

// Get snapshot IDs referenced by branches/tags if we should preserve them
let ref_snapshot_ids = if retain_ref_snapshots {
let mut referenced_ids = std::collections::HashSet::new();
for snapshot_ref in table_metadata.refs.values() {
referenced_ids.insert(snapshot_ref.snapshot_id);
}
referenced_ids
} else {
std::collections::HashSet::new()
};

let mut snapshots_to_expire = Vec::new();

// Apply retention logic
for (index, snapshot) in all_snapshots.iter().enumerate() {
let snapshot_id = *snapshot.snapshot_id();
let mut should_retain = false;

// Never expire the current snapshot
if Some(snapshot_id) == current_snapshot_id
|| ref_snapshot_ids.contains(&snapshot_id)
{
should_retain = true;
}
// Keep the most recent N snapshots if retain_last is specified
else if let Some(retain_count) = retain_last {
if index < retain_count {
should_retain = true;
}
}

// Apply older_than filter only if not already marked for retention
if !should_retain {
if let Some(threshold) = older_than {
if *snapshot.timestamp_ms() >= threshold {
should_retain = true;
}
}
}

if !should_retain {
snapshots_to_expire.push(snapshot_id);
}
}

// If dry run, return without making changes
if dry_run {
debug!(
"Dry run: would expire {} snapshots: {:?}",
snapshots_to_expire.len(),
snapshots_to_expire
);
return Ok((None, vec![]));
}

// If no snapshots to expire, return early
if snapshots_to_expire.is_empty() {
debug!("No snapshots to expire");
return Ok((None, vec![]));
}

debug!(
"Expiring {} snapshots: {:?}",
snapshots_to_expire.len(),
snapshots_to_expire
);

// Return the RemoveSnapshots update
Ok((
None,
vec![TableUpdate::RemoveSnapshots {
snapshot_ids: snapshots_to_expire,
}],
))
}
}
}
}
Expand Down Expand Up @@ -1035,3 +1140,142 @@ pub fn compute_n_splits(
x => x.ilog2() + 1,
}
}

#[cfg(test)]
mod tests {
use super::*;
use futures::executor::block_on;
use iceberg_rust_spec::spec::schema::SchemaBuilder;
use iceberg_rust_spec::spec::table_metadata::TableMetadataBuilder;
use iceberg_rust_spec::spec::types::{PrimitiveType, StructField, Type};
use object_store::memory::InMemory;

fn sample_metadata(
snapshot_defs: &[(i64, i64)],
current_snapshot: Option<i64>,
refs: &[(&str, i64)],
) -> TableMetadata {
let schema = SchemaBuilder::default()
.with_schema_id(0)
.with_struct_field(StructField {
id: 1,
name: "id".to_string(),
required: true,
field_type: Type::Primitive(PrimitiveType::Long),
doc: None,
})
.build()
.unwrap();

let snapshots = snapshot_defs
.iter()
.enumerate()
.map(|(idx, (snapshot_id, timestamp))| {
let snapshot = SnapshotBuilder::default()
.with_snapshot_id(*snapshot_id)
.with_sequence_number((idx + 1) as i64)
.with_timestamp_ms(*timestamp)
.with_manifest_list(format!("manifest-{snapshot_id}.avro"))
.with_summary(Summary {
operation: SnapshotOperation::Append,
other: HashMap::new(),
})
.with_schema_id(0)
.build()
.unwrap();
(*snapshot_id, snapshot)
})
.collect::<HashMap<_, _>>();

let refs = refs
.iter()
.map(|(name, snapshot_id)| {
(
(*name).to_string(),
SnapshotReference {
snapshot_id: *snapshot_id,
retention: SnapshotRetention::default(),
},
)
})
.collect::<HashMap<_, _>>();

TableMetadataBuilder::default()
.location("s3://tests/table".to_owned())
.current_schema_id(0)
.schemas(HashMap::from_iter(vec![(0, schema)]))
.snapshots(snapshots)
.current_snapshot_id(current_snapshot)
.last_sequence_number(snapshot_defs.len() as i64)
.refs(refs)
.build()
.unwrap()
}

fn execute_operation(
metadata: &TableMetadata,
older_than: Option<i64>,
retain_last: Option<usize>,
retain_refs: bool,
dry_run: bool,
) -> Result<Vec<TableUpdate>, Error> {
let op = Operation::ExpireSnapshots {
older_than,
retain_last,
_clean_orphan_files: false,
retain_ref_snapshots: retain_refs,
dry_run,
};
let store = Arc::new(InMemory::new());
block_on(op.execute(metadata, store)).map(|(_, updates)| updates)
}

fn collect_snapshot_ids(updates: &[TableUpdate]) -> Vec<i64> {
updates
.iter()
.flat_map(|update| match update {
TableUpdate::RemoveSnapshots { snapshot_ids } => snapshot_ids.clone(),
_ => Vec::new(),
})
.collect()
}

#[test]
fn snapshot_expiration_requires_policy() {
let metadata = sample_metadata(&[(1, 1_000)], Some(1), &[]);
let result = execute_operation(&metadata, None, None, true, false);
assert!(matches!(result, Err(Error::InvalidFormat(_))));
}

#[test]
fn snapshot_expiration_applies_time_and_count_filters() {
let metadata = sample_metadata(
&[(1, 1_000), (2, 2_000), (3, 3_000), (4, 4_000)],
Some(4),
&[],
);
let updates = execute_operation(&metadata, Some(2_500), Some(2), true, false).unwrap();
let mut expired = collect_snapshot_ids(&updates);
expired.sort();
assert_eq!(expired, vec![1, 2]);
}

#[test]
fn snapshot_expiration_preserves_current_and_refs() {
let metadata = sample_metadata(
&[(10, 1_000), (20, 2_000), (30, 3_000)],
Some(30),
&[("branch", 20)],
);
let updates = execute_operation(&metadata, Some(1_500), None, true, false).unwrap();
// Snapshot 10 is the only candidate because 20 is referenced and 30 is current.
assert_eq!(collect_snapshot_ids(&updates), vec![10]);
}

#[test]
fn snapshot_expiration_supports_dry_run() {
let metadata = sample_metadata(&[(1, 1_000), (2, 900)], Some(1), &[]);
let updates = execute_operation(&metadata, Some(950), None, true, true).unwrap();
assert!(updates.is_empty());
}
}