Skip to content

feat(transaction): RewriteFilesAction + Validation Support + Process Delete Manifests #1606

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
Draft
2 changes: 2 additions & 0 deletions crates/iceberg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,5 @@ pub mod writer;

mod delete_vector;
pub mod puffin;
/// Utility functions and modules.
pub mod util;
6 changes: 1 addition & 5 deletions crates/iceberg/src/spec/manifest/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,6 @@ impl ManifestWriter {
/// Add a delete manifest entry. This method will update following status of the entry:
/// - Update the entry status to `Deleted`
/// - Set the snapshot id to the current snapshot id
///
/// # TODO
/// Remove this allow later
#[allow(dead_code)]
pub(crate) fn add_delete_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
self.check_data_file(&entry.data_file)?;
entry.status = ManifestStatus::Deleted;
Expand Down Expand Up @@ -282,7 +278,7 @@ impl ManifestWriter {
Ok(())
}

/// Add an file as existing manifest entry. The original data and file sequence numbers, snapshot ID,
/// Add a file as existing manifest entry. The original data and file sequence numbers, snapshot ID,
/// which were assigned at commit, must be preserved when adding an existing entry.
pub fn add_existing_file(
&mut self,
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/spec/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub const UNASSIGNED_SNAPSHOT_ID: i64 = -1;

/// Reference to [`Snapshot`].
pub type SnapshotRef = Arc<Snapshot>;
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Hash)]
#[serde(rename_all = "lowercase")]
/// The operation field is used by some operations, like snapshot expiration, to skip processing certain snapshots.
pub enum Operation {
Expand Down
9 changes: 8 additions & 1 deletion crates/iceberg/src/transaction/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::table::Table;
use crate::transaction::snapshot::{
DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer,
};
use crate::transaction::validate::SnapshotValidator;
use crate::transaction::{ActionCommit, TransactionAction};

/// FastAppendAction is a transaction action for fast append data files to the table.
Expand Down Expand Up @@ -90,6 +91,10 @@ impl TransactionAction for FastAppendAction {
self.key_metadata.clone(),
self.snapshot_properties.clone(),
self.added_data_files.clone(),
vec![],
vec![],
vec![],
None,
);

// validate added files
Expand All @@ -110,6 +115,8 @@ impl TransactionAction for FastAppendAction {

struct FastAppendOperation;

impl SnapshotValidator for FastAppendOperation {}

impl SnapshotProduceOperation for FastAppendOperation {
fn operation(&self) -> Operation {
Operation::Append
Expand All @@ -124,7 +131,7 @@ impl SnapshotProduceOperation for FastAppendOperation {

async fn existing_manifest(
&self,
snapshot_produce: &SnapshotProducer<'_>,
snapshot_produce: &mut SnapshotProducer<'_>,
) -> Result<Vec<ManifestFile>> {
let Some(snapshot) = snapshot_produce.table.metadata().current_snapshot() else {
return Ok(vec![]);
Expand Down
8 changes: 8 additions & 0 deletions crates/iceberg/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@ use std::collections::HashMap;

pub use action::*;
mod append;
mod rewrite_files;
mod snapshot;
mod sort_order;
mod update_location;
mod update_properties;
mod update_statistics;
mod upgrade_format_version;
mod validate;

use std::sync::Arc;
use std::time::Duration;
Expand All @@ -78,6 +80,7 @@ use crate::spec::{
use crate::table::Table;
use crate::transaction::action::BoxedTransactionAction;
use crate::transaction::append::FastAppendAction;
use crate::transaction::rewrite_files::RewriteFilesAction;
use crate::transaction::sort_order::ReplaceSortOrderAction;
use crate::transaction::update_location::UpdateLocationAction;
use crate::transaction::update_properties::UpdatePropertiesAction;
Expand Down Expand Up @@ -153,6 +156,11 @@ impl Transaction {
ReplaceSortOrderAction::new()
}

/// Rewrite a set of data files of table
pub fn rewrite_files(&self) -> RewriteFilesAction {
RewriteFilesAction::new()
}

/// Set the location of table
pub fn update_location(&self) -> UpdateLocationAction {
UpdateLocationAction::new()
Expand Down
Loading
Loading