diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 612a5db601..79106095d7 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -87,3 +87,5 @@ pub mod writer; mod delete_vector; pub mod puffin; +/// Utility functions and modules. +pub mod util; diff --git a/crates/iceberg/src/spec/manifest/writer.rs b/crates/iceberg/src/spec/manifest/writer.rs index 39945a513c..3fdaf84222 100644 --- a/crates/iceberg/src/spec/manifest/writer.rs +++ b/crates/iceberg/src/spec/manifest/writer.rs @@ -236,10 +236,6 @@ impl ManifestWriter { /// Add a delete manifest entry. This method will update following status of the entry: /// - Update the entry status to `Deleted` /// - Set the snapshot id to the current snapshot id - /// - /// # TODO - /// Remove this allow later - #[allow(dead_code)] pub(crate) fn add_delete_entry(&mut self, mut entry: ManifestEntry) -> Result<()> { self.check_data_file(&entry.data_file)?; entry.status = ManifestStatus::Deleted; @@ -282,7 +278,7 @@ impl ManifestWriter { Ok(()) } - /// Add an file as existing manifest entry. The original data and file sequence numbers, snapshot ID, + /// Add a file as existing manifest entry. The original data and file sequence numbers, snapshot ID, /// which were assigned at commit, must be preserved when adding an existing entry. pub fn add_existing_file( &mut self, diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index 04a9e15b34..a5f36f4244 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -39,7 +39,7 @@ pub const UNASSIGNED_SNAPSHOT_ID: i64 = -1; /// Reference to [`Snapshot`]. pub type SnapshotRef = Arc; -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Hash)] #[serde(rename_all = "lowercase")] /// The operation field is used by some operations, like snapshot expiration, to skip processing certain snapshots. pub enum Operation { diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index f248543df2..3a5faafe4a 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -27,6 +27,7 @@ use crate::table::Table; use crate::transaction::snapshot::{ DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer, }; +use crate::transaction::validate::SnapshotValidator; use crate::transaction::{ActionCommit, TransactionAction}; /// FastAppendAction is a transaction action for fast append data files to the table. @@ -90,6 +91,9 @@ impl TransactionAction for FastAppendAction { self.key_metadata.clone(), self.snapshot_properties.clone(), self.added_data_files.clone(), + vec![], + vec![], + vec![], ); // validate added files @@ -110,6 +114,8 @@ impl TransactionAction for FastAppendAction { struct FastAppendOperation; +impl SnapshotValidator for FastAppendOperation {} + impl SnapshotProduceOperation for FastAppendOperation { fn operation(&self) -> Operation { Operation::Append @@ -124,7 +130,7 @@ impl SnapshotProduceOperation for FastAppendOperation { async fn existing_manifest( &self, - snapshot_produce: &SnapshotProducer<'_>, + snapshot_produce: &mut SnapshotProducer<'_>, ) -> Result> { let Some(snapshot) = snapshot_produce.table.metadata().current_snapshot() else { return Ok(vec![]); diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 06549a95c5..a81a7bab98 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -56,12 +56,14 @@ use std::collections::HashMap; pub use action::*; mod append; +mod rewrite_files; mod snapshot; mod sort_order; mod update_location; mod update_properties; mod update_statistics; mod upgrade_format_version; +mod validate; use std::sync::Arc; use std::time::Duration; @@ -78,6 +80,7 @@ use crate::spec::{ use crate::table::Table; use crate::transaction::action::BoxedTransactionAction; use crate::transaction::append::FastAppendAction; +use crate::transaction::rewrite_files::RewriteFilesAction; use crate::transaction::sort_order::ReplaceSortOrderAction; use crate::transaction::update_location::UpdateLocationAction; use crate::transaction::update_properties::UpdatePropertiesAction; @@ -153,6 +156,11 @@ impl Transaction { ReplaceSortOrderAction::new() } + /// Rewrite a set of data files of table + pub fn rewrite_files(&self) -> RewriteFilesAction { + RewriteFilesAction::new() + } + /// Set the location of table pub fn update_location(&self) -> UpdateLocationAction { UpdateLocationAction::new() diff --git a/crates/iceberg/src/transaction/rewrite_files.rs b/crates/iceberg/src/transaction/rewrite_files.rs new file mode 100644 index 0000000000..b9c69d04e9 --- /dev/null +++ b/crates/iceberg/src/transaction/rewrite_files.rs @@ -0,0 +1,375 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use async_trait::async_trait; +use uuid::Uuid; + +use super::snapshot::{DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer}; +use super::{ActionCommit, TransactionAction}; +use crate::error::{Error, ErrorKind, Result}; +use crate::spec::{ + DataContentType, DataFile, ManifestContentType, ManifestEntry, ManifestEntryRef, ManifestFile, + ManifestStatus, Operation, +}; +use crate::table::Table; +use crate::transaction::validate::SnapshotValidator; + +/// Transaction action for rewriting files. +pub struct RewriteFilesAction { + commit_uuid: Option, + key_metadata: Option>, + snapshot_properties: HashMap, + added_data_files: Vec, + added_delete_files: Vec, + deleted_data_files: Vec, + deleted_delete_files: Vec, + data_sequence_number: Option, + starting_snapshot_id: Option, +} + +pub struct RewriteFilesOperation { + added_data_files: Vec, + added_delete_files: Vec, + deleted_data_files: Vec, + deleted_delete_files: Vec, + starting_snapshot_id: Option, + data_sequence_number: Option, +} + +impl RewriteFilesAction { + pub fn new() -> Self { + Self { + commit_uuid: None, + key_metadata: None, + snapshot_properties: Default::default(), + added_data_files: vec![], + added_delete_files: vec![], + deleted_data_files: vec![], + deleted_delete_files: vec![], + data_sequence_number: None, + starting_snapshot_id: None, + } + } + + /// Add added data files to the snapshot. + pub fn add_data_files( + mut self, + data_files: impl IntoIterator, + ) -> Result { + for data_file in data_files { + match data_file.content { + DataContentType::Data => self.added_data_files.push(data_file), + DataContentType::PositionDeletes | DataContentType::EqualityDeletes => { + self.added_delete_files.push(data_file) + } + } + } + Ok(self) + } + + /// Add deleted data files to the snapshot. + pub fn delete_data_files( + mut self, + data_files: impl IntoIterator, + ) -> Result { + for data_file in data_files { + match data_file.content { + DataContentType::Data => self.deleted_data_files.push(data_file), + DataContentType::PositionDeletes | DataContentType::EqualityDeletes => { + self.deleted_delete_files.push(data_file) + } + } + } + + Ok(self) + } + + /// Set commit UUID for the snapshot. + pub fn set_commit_uuid(mut self, commit_uuid: Uuid) -> Self { + self.commit_uuid = Some(commit_uuid); + self + } + + /// Set key metadata for manifest files. + pub fn set_key_metadata(mut self, key_metadata: Vec) -> Self { + self.key_metadata = Some(key_metadata); + self + } + + /// Set snapshot summary properties. + pub fn set_snapshot_properties(mut self, snapshot_properties: HashMap) -> Self { + self.snapshot_properties = snapshot_properties; + self + } + + /// Set the data sequence number for this rewrite operation. + /// The number will be used for all new data files that are added in this rewrite. + pub fn set_data_sequence_number(mut self, sequence_number: i64) -> Self { + self.data_sequence_number = Some(sequence_number); + self + } + + /// Set the snapshot ID used in any reads for this operation. + pub fn set_starting_snapshot_id(mut self, snapshot_id: i64) -> Self { + self.starting_snapshot_id = Some(snapshot_id); + self + } +} + +impl Default for RewriteFilesAction { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl TransactionAction for RewriteFilesAction { + async fn commit(self: Arc, table: &Table) -> Result { + let snapshot_producer = SnapshotProducer::new( + table, + self.commit_uuid.unwrap_or_else(Uuid::now_v7), + self.key_metadata.clone(), + self.snapshot_properties.clone(), + self.added_data_files.clone(), + self.added_delete_files.clone(), + self.deleted_data_files.clone(), + self.deleted_delete_files.clone(), + ); + + let rewrite_operation = RewriteFilesOperation { + added_data_files: self.added_data_files.clone(), + added_delete_files: self.added_delete_files.clone(), + deleted_data_files: self.deleted_data_files.clone(), + deleted_delete_files: self.deleted_delete_files.clone(), + starting_snapshot_id: self.starting_snapshot_id, + data_sequence_number: self.data_sequence_number, + }; + + // todo should be able to configure to use the merge manifest process + snapshot_producer + .commit(rewrite_operation, DefaultManifestProcess) + .await + } +} + +fn copy_with_deleted_status(entry: &ManifestEntryRef) -> Result { + // todo should we fail on missing properties or should we ignore them when they don't exist? + let builder = ManifestEntry::builder() + .status(ManifestStatus::Deleted) + .snapshot_id(entry.snapshot_id().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Missing snapshot_id for entry with file path: {}", + entry.file_path() + ), + ) + })?) + .sequence_number(entry.sequence_number().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Missing sequence_number for entry with file path: {}", + entry.file_path() + ), + ) + })?) + // todo copy file seq no as well + .data_file(entry.data_file().clone()); + + Ok(builder.build()) +} + +impl SnapshotValidator for RewriteFilesOperation { + async fn validate(&self, base: &Table, parent_snapshot_id: Option) -> Result<()> { + // Validate replaced and added files + if self.deleted_data_files.is_empty() && self.deleted_delete_files.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Files to delete cannot be empty", + )); + } + if self.deleted_data_files.is_empty() && !self.added_data_files.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Data files to add must be empty because there's no data file to be rewritten", + )); + } + if self.deleted_delete_files.is_empty() && !self.added_delete_files.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Delete files to add must be empty because there's no delete file to be rewritten", + )); + } + + // todo add use_starting_seq_number to determine if we want to use data_sequence_number + // If there are replaced data files, there cannot be any new row-level deletes for those data files + if !self.deleted_data_files.is_empty() { + self.validate_no_new_deletes_for_data_files( + base, + self.starting_snapshot_id, + parent_snapshot_id, + &self.deleted_data_files, + self.data_sequence_number.is_some(), + ) + .await?; + } + + Ok(()) + } +} + +impl SnapshotProduceOperation for RewriteFilesOperation { + fn operation(&self) -> Operation { + Operation::Replace + } + + async fn delete_entries( + &self, + snapshot_producer: &SnapshotProducer<'_>, + ) -> Result> { + // Find entries that are associated with deleted files + let snapshot = snapshot_producer.table.metadata().current_snapshot(); + + if let Some(snapshot) = snapshot { + let manifest_list = snapshot + .load_manifest_list( + snapshot_producer.table.file_io(), + snapshot_producer.table.metadata(), + ) + .await?; + + let mut delete_entries = Vec::new(); + + for manifest_file in manifest_list.entries() { + let manifest = manifest_file + .load_manifest(snapshot_producer.table.file_io()) + .await?; + + for entry in manifest.entries() { + match entry.content_type() { + DataContentType::Data => { + if snapshot_producer + .deleted_data_files + .iter() + .any(|f| f.file_path == entry.data_file().file_path) + { + delete_entries.push(copy_with_deleted_status(entry)?) + } + } + DataContentType::PositionDeletes | DataContentType::EqualityDeletes => { + if snapshot_producer + .deleted_delete_files + .iter() + .any(|f| f.file_path == entry.data_file().file_path) + { + delete_entries.push(copy_with_deleted_status(entry)?) + } + } + } + } + } + + Ok(delete_entries) + } else { + Ok(vec![]) + } + } + + async fn existing_manifest( + &self, + snapshot_producer: &mut SnapshotProducer<'_>, + ) -> Result> { + let Some(snapshot) = snapshot_producer.table.metadata().current_snapshot() else { + return Ok(vec![]); + }; + + let manifest_list = snapshot + .load_manifest_list( + snapshot_producer.table.file_io(), + snapshot_producer.table.metadata(), + ) + .await?; + + let mut existing_files = Vec::new(); + + for manifest_file in manifest_list.entries() { + let manifest = manifest_file + .load_manifest(snapshot_producer.table.file_io()) + .await?; + + // Find files to delete from the current manifest entries + let found_files_to_delete: HashSet<_> = manifest + .entries() + .iter() + .filter_map(|entry| { + match entry.content_type() { + DataContentType::Data => { + if snapshot_producer + .deleted_data_files + .iter() + .any(|f| f.file_path == entry.data_file().file_path) + { + return Some(entry.data_file().file_path().to_string()); + } + } + DataContentType::EqualityDeletes | DataContentType::PositionDeletes => { + if snapshot_producer + .deleted_delete_files + .iter() + .any(|f| f.file_path == entry.data_file().file_path) + { + return Some(entry.data_file().file_path().to_string()); + } + } + } + None + }) + .collect(); + + if found_files_to_delete.is_empty() + && (manifest_file.has_added_files() || manifest_file.has_existing_files()) + { + // All files from the existing manifest entries are still valid + existing_files.push(manifest_file.clone()); + } else { + // Some files are deleted already + // Rewrite the manifest file and exclude the deleted data files + let mut manifest_writer = snapshot_producer.new_manifest_writer( + ManifestContentType::Data, + manifest_file.partition_spec_id, + )?; + + manifest + .entries() + .iter() + .filter(|entry| { + entry.status() != ManifestStatus::Deleted + && !found_files_to_delete.contains(entry.data_file().file_path()) + }) + .try_for_each(|entry| manifest_writer.add_entry((**entry).clone()))?; + + existing_files.push(manifest_writer.write_manifest_file().await?); + } + } + + Ok(existing_files) + } +} diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 48dc2b5b90..f4cfa7ff87 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -17,34 +17,34 @@ use std::collections::{HashMap, HashSet}; use std::future::Future; -use std::ops::RangeFrom; +use std::ops::{Deref, RangeFrom}; use uuid::Uuid; use crate::error::Result; use crate::spec::{ - DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, ManifestEntry, - ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, Operation, - PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT, - Snapshot, SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, StructType, - Summary, update_snapshot_summaries, + DataContentType, DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, + ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, + Operation, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT, + PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT, Snapshot, SnapshotReference, SnapshotRetention, + SnapshotSummaryCollector, Struct, StructType, Summary, update_snapshot_summaries, }; use crate::table::Table; use crate::transaction::ActionCommit; +use crate::transaction::validate::SnapshotValidator; use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; const META_ROOT_PATH: &str = "metadata"; -pub(crate) trait SnapshotProduceOperation: Send + Sync { +pub(crate) trait SnapshotProduceOperation: Send + Sync + SnapshotValidator { fn operation(&self) -> Operation; - #[allow(unused)] fn delete_entries( &self, snapshot_produce: &SnapshotProducer, ) -> impl Future>> + Send; fn existing_manifest( &self, - snapshot_produce: &SnapshotProducer<'_>, + snapshot_produce: &mut SnapshotProducer<'_>, ) -> impl Future>> + Send; } @@ -75,6 +75,9 @@ pub(crate) struct SnapshotProducer<'a> { key_metadata: Option>, snapshot_properties: HashMap, added_data_files: Vec, + added_delete_files: Vec, + pub deleted_data_files: Vec, + pub deleted_delete_files: Vec, // A counter used to generate unique manifest file names. // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). @@ -82,12 +85,17 @@ pub(crate) struct SnapshotProducer<'a> { } impl<'a> SnapshotProducer<'a> { + // todo add a builder for this to fix the clippy + #[allow(clippy::too_many_arguments)] pub(crate) fn new( table: &'a Table, commit_uuid: Uuid, key_metadata: Option>, snapshot_properties: HashMap, added_data_files: Vec, + added_delete_files: Vec, + deleted_data_files: Vec, + deleted_delete_files: Vec, ) -> Self { Self { table, @@ -96,13 +104,16 @@ impl<'a> SnapshotProducer<'a> { key_metadata, snapshot_properties, added_data_files, + added_delete_files, + deleted_data_files, + deleted_delete_files, manifest_counter: (0..), } } pub(crate) fn validate_added_data_files(&self, added_data_files: &[DataFile]) -> Result<()> { for data_file in added_data_files { - if data_file.content_type() != crate::spec::DataContentType::Data { + if data_file.content_type() != DataContentType::Data { return Err(Error::new( ErrorKind::DataInvalid, "Only data content type is allowed for fast append", @@ -186,7 +197,11 @@ impl<'a> SnapshotProducer<'a> { snapshot_id } - fn new_manifest_writer(&mut self, content: ManifestContentType) -> Result { + pub fn new_manifest_writer( + &mut self, + content: ManifestContentType, + spec_id: i32, + ) -> Result { let new_manifest_path = format!( "{}/{}/{}-m{}.{}", self.table.metadata().location(), @@ -203,8 +218,12 @@ impl<'a> SnapshotProducer<'a> { self.table.metadata().current_schema().clone(), self.table .metadata() - .default_partition_spec() - .as_ref() + .partition_spec_by_id(spec_id) + .ok_or(Error::new( + ErrorKind::DataInvalid, + format!("Partition spec with id: {spec_id} is not found!"), + ))? + .deref() .clone(), ); if self.table.metadata().format_version() == FormatVersion::V1 { @@ -249,8 +268,15 @@ impl<'a> SnapshotProducer<'a> { } // Write manifest file for added data files and return the ManifestFile for ManifestList. - async fn write_added_manifest(&mut self) -> Result { - let added_data_files = std::mem::take(&mut self.added_data_files); + async fn write_added_manifest( + &mut self, + content_type: ManifestContentType, + ) -> Result { + let added_data_files = match content_type { + ManifestContentType::Data => std::mem::take(&mut self.added_data_files), + ManifestContentType::Deletes => std::mem::take(&mut self.added_delete_files), + }; + if added_data_files.is_empty() { return Err(Error::new( ErrorKind::PreconditionFailed, @@ -272,13 +298,69 @@ impl<'a> SnapshotProducer<'a> { builder.build() } }); - let mut writer = self.new_manifest_writer(ManifestContentType::Data)?; + let mut writer = self.new_manifest_writer( + content_type, + self.table.metadata().default_partition_spec_id(), + )?; for entry in manifest_entries { writer.add_entry(entry)?; } writer.write_manifest_file().await } + async fn write_deleted_manifest( + &mut self, + deleted_entries: Vec, + ) -> Result> { + if deleted_entries.is_empty() { + Ok(Vec::new()) + } else { + // Initialize partition groups + let mut partition_groups = HashMap::new(); + for entry in deleted_entries { + partition_groups + .entry(entry.data_file().partition_spec_id) + .or_insert_with(Vec::new) + .push(entry); + } + + // Write manifest files for each spec-entries pair + let mut deleted_manifests = Vec::new(); + for (spec_id, entries) in partition_groups { + let mut data_manifest_writer: Option = None; + let mut delete_manifest_writer: Option = None; + for entry in entries { + match entry.data_file().content_type() { + DataContentType::Data => data_manifest_writer + .get_or_insert( + self.new_manifest_writer(ManifestContentType::Data, spec_id)?, + ) + .add_entry(entry)?, + DataContentType::PositionDeletes | DataContentType::EqualityDeletes => { + delete_manifest_writer + .get_or_insert( + self.new_manifest_writer( + ManifestContentType::Deletes, + spec_id, + )?, + ) + .add_delete_entry(entry)? + } + } + } + + if let Some(writer) = data_manifest_writer { + deleted_manifests.push(writer.write_manifest_file().await?); + }; + if let Some(writer) = delete_manifest_writer { + deleted_manifests.push(writer.write_manifest_file().await?); + }; + } + + Ok(deleted_manifests) + } + } + async fn manifest_file( &mut self, snapshot_produce_operation: &OP, @@ -301,12 +383,20 @@ impl<'a> SnapshotProducer<'a> { // Process added entries. if !self.added_data_files.is_empty() { - let added_manifest = self.write_added_manifest().await?; + let added_manifest = self.write_added_manifest(ManifestContentType::Data).await?; + manifest_files.push(added_manifest); + } + if !self.added_delete_files.is_empty() { + let added_manifest = self + .write_added_manifest(ManifestContentType::Deletes) + .await?; manifest_files.push(added_manifest); } - // # TODO - // Support process delete entries. + let delete_manifests = self + .write_deleted_manifest(snapshot_produce_operation.delete_entries(self).await?) + .await?; + manifest_files.extend(delete_manifests); let manifest_files = manifest_process.process_manifests(self, manifest_files); Ok(manifest_files) @@ -381,6 +471,11 @@ impl<'a> SnapshotProducer<'a> { snapshot_produce_operation: OP, process: MP, ) -> Result { + // Validate to avoid conflicts + snapshot_produce_operation + .validate(self.table, self.table.metadata().current_snapshot_id) + .await?; + let new_manifests = self .manifest_file(&snapshot_produce_operation, &process) .await?; diff --git a/crates/iceberg/src/transaction/validate.rs b/crates/iceberg/src/transaction/validate.rs new file mode 100644 index 0000000000..28e9083c23 --- /dev/null +++ b/crates/iceberg/src/transaction/validate.rs @@ -0,0 +1,237 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashSet; +use std::sync::Arc; + +use futures::SinkExt; +use futures::future::try_join_all; +use once_cell::sync::Lazy; + +use crate::delete_file_index::DeleteFileIndex; +use crate::error::Result; +use crate::scan::DeleteFileContext; +use crate::spec::{ + DataContentType, DataFile, FormatVersion, INITIAL_SEQUENCE_NUMBER, ManifestContentType, + ManifestFile, Operation, SnapshotRef, +}; +use crate::table::Table; +use crate::util::snapshot::ancestors_between; +use crate::{Error, ErrorKind}; + +static VALIDATE_ADDED_DELETE_FILES_OPERATIONS: Lazy> = + Lazy::new(|| HashSet::from([Operation::Overwrite, Operation::Delete])); + +/// A trait for validating snapshots in an Iceberg table. +/// +/// This trait provides methods to validate snapshots and their history, +/// ensuring data integrity and consistency across table operations. +pub(crate) trait SnapshotValidator { + /// Validates a snapshot against a table. + /// + /// # Arguments + /// + /// * `base` - The base table to validate against + /// * `parent_snapshot_id` - The ID of the parent snapshot, if any. This is usually + /// the latest snapshot of the base table, unless it's a non-main branch + /// (note: writing to branches is not currently supported) + /// + /// # Returns + /// + /// A `Result` indicating success or an error if validation fails + async fn validate(&self, _base: &Table, _parent_snapshot_id: Option) -> Result<()> { + Ok(()) + } + + /// Retrieves the history of snapshots between two points with matching operations and content type. + /// + /// # Arguments + /// + /// * `base` - The base table to retrieve history from + /// * `from_snapshot_id` - The starting snapshot ID (exclusive), or None to start from the beginning + /// * `to_snapshot_id` - The ending snapshot ID (inclusive) + /// * `matching_operations` - Set of operations to match when collecting snapshots + /// * `manifest_content_type` - The content type of manifests to collect + /// + /// # Returns + /// + /// A tuple containing: + /// * A vector of manifest files matching the criteria + /// * A set of snapshot IDs that were collected + /// + /// # Errors + /// + /// Returns an error if the history between the snapshots cannot be determined + async fn validation_history( + &self, + base: &Table, + from_snapshot_id: Option, + to_snapshot_id: i64, + matching_operations: &HashSet, + manifest_content_type: ManifestContentType, + ) -> Result<(Vec, HashSet)> { + let mut manifests: Vec = vec![]; + let mut new_snapshots = HashSet::new(); + let mut last_snapshot: Option = None; + + let snapshots = ancestors_between( + &Arc::new(base.metadata().clone()), + to_snapshot_id, + from_snapshot_id, + ); + + for current_snapshot in snapshots { + last_snapshot = Some(current_snapshot.clone()); + + // Find all snapshots with the matching operations + // and their manifest files with the matching content type + if matching_operations.contains(¤t_snapshot.summary().operation) { + new_snapshots.insert(current_snapshot.snapshot_id()); + current_snapshot + .load_manifest_list(base.file_io(), base.metadata()) + .await? + .entries() + .iter() + .for_each(|manifest| { + if manifest.content == manifest_content_type + && manifest.added_snapshot_id == current_snapshot.snapshot_id() + { + manifests.push(manifest.clone()); + } + }); + } + } + + if last_snapshot.is_some() + && last_snapshot.clone().unwrap().parent_snapshot_id() != from_snapshot_id + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot determine history between starting snapshot {} and the last known ancestor {}", + from_snapshot_id.unwrap_or(-1), + last_snapshot.unwrap().snapshot_id() + ), + )); + } + + Ok((manifests, new_snapshots)) + } + + /// Validates that there are no new delete files for the given data files. + /// + /// # Arguments + /// + /// * `base` - The base table to validate against + /// * `from_snapshot_id` - The starting snapshot ID (exclusive), or None to start from the beginning + /// * `to_snapshot_id` - The ending snapshot ID (inclusive), or None if there is no current table state + /// * `data_files` - The data files to check for conflicting delete files + /// * `ignore_equality_deletes` - Whether to ignore equality deletes and only check for positional deletes + /// + /// # Returns + /// + /// A `Result` indicating success or an error if validation fails + /// + /// # Errors + /// + /// Returns an error if new delete files are found for any of the data files + async fn validate_no_new_deletes_for_data_files( + &self, + base: &Table, + from_snapshot_id: Option, + to_snapshot_id: Option, + data_files: &[DataFile], + ignore_equality_deletes: bool, + ) -> Result<()> { + // If there is no current table state, no files have been added + if to_snapshot_id.is_none() || base.metadata().format_version() != FormatVersion::V1 { + return Ok(()); + } + let to_snapshot_id = to_snapshot_id.unwrap(); + + // Get matching delete files have been added since the from_snapshot_id + let (delete_manifests, _) = self + .validation_history( + base, + from_snapshot_id, + to_snapshot_id, + &VALIDATE_ADDED_DELETE_FILES_OPERATIONS, + ManifestContentType::Deletes, + ) + .await?; + + // Build delete file index + let (delete_file_index, mut delete_file_tx) = DeleteFileIndex::new(); + let manifests = try_join_all( + delete_manifests + .iter() + .map(|f| f.load_manifest(base.file_io())) + .collect::>(), + ) + .await?; + let manifest_entries = manifests.iter().flat_map(|manifest| manifest.entries()); + for entry in manifest_entries { + let delete_file_ctx = DeleteFileContext { + manifest_entry: entry.clone(), + partition_spec_id: entry.data_file().partition_spec_id, + }; + delete_file_tx.send(delete_file_ctx).await?; + } + + // Get starting seq num from starting snapshot if available + let starting_sequence_number = if let Some(from_snapshot_id) = from_snapshot_id { + match base.metadata().snapshots.get(&from_snapshot_id) { + Some(snapshot) => snapshot.sequence_number(), + None => INITIAL_SEQUENCE_NUMBER, + } + } else { + INITIAL_SEQUENCE_NUMBER + }; + + // Validate if there are deletes using delete file index + for data_file in data_files { + let delete_files = delete_file_index + .get_deletes_for_data_file(data_file, Some(starting_sequence_number)) + .await; + + if ignore_equality_deletes { + if delete_files + .iter() + .any(|delete_file| delete_file.file_type == DataContentType::PositionDeletes) + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot commit, found new positional delete for added data file: {}", + data_file.file_path + ), + )); + } + } else if !delete_files.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot commit, found new delete for added data file: {}", + data_file.file_path + ), + )); + } + } + + Ok(()) + } +} diff --git a/crates/iceberg/src/util/mod.rs b/crates/iceberg/src/util/mod.rs new file mode 100644 index 0000000000..b614c981ec --- /dev/null +++ b/crates/iceberg/src/util/mod.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Utilities for working with snapshots. +pub mod snapshot; diff --git a/crates/iceberg/src/util/snapshot.rs b/crates/iceberg/src/util/snapshot.rs new file mode 100644 index 0000000000..62aa6769ec --- /dev/null +++ b/crates/iceberg/src/util/snapshot.rs @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::spec::{SnapshotRef, TableMetadataRef}; + +struct Ancestors { + next: Option, + get_snapshot: Box Option + Send>, +} + +impl Iterator for Ancestors { + type Item = SnapshotRef; + + fn next(&mut self) -> Option { + let snapshot = self.next.take()?; + let result = snapshot.clone(); + self.next = snapshot + .parent_snapshot_id() + .and_then(|id| (self.get_snapshot)(id)); + Some(result) + } +} + +/// Iterate starting from `snapshot` (inclusive) to the root snapshot. +pub fn ancestors_of( + table_metadata: &TableMetadataRef, + snapshot: i64, +) -> Box + Send> { + if let Some(snapshot) = table_metadata.snapshot_by_id(snapshot) { + let table_metadata = table_metadata.clone(); + Box::new(Ancestors { + next: Some(snapshot.clone()), + get_snapshot: Box::new(move |id| table_metadata.snapshot_by_id(id).cloned()), + }) + } else { + Box::new(std::iter::empty()) + } +} + +/// Iterate starting from `snapshot` (inclusive) to `oldest_snapshot_id` (exclusive). +pub fn ancestors_between( + table_metadata: &TableMetadataRef, + latest_snapshot_id: i64, + oldest_snapshot_id: Option, +) -> Box + Send> { + let Some(oldest_snapshot_id) = oldest_snapshot_id else { + return Box::new(ancestors_of(table_metadata, latest_snapshot_id)); + }; + + if latest_snapshot_id == oldest_snapshot_id { + return Box::new(std::iter::empty()); + } + + Box::new( + ancestors_of(table_metadata, latest_snapshot_id) + .take_while(move |snapshot| snapshot.snapshot_id() != oldest_snapshot_id), + ) +}