Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ serde_repr = { workspace = true }
serde_with = { workspace = true }
strum = { workspace = true, features = ["derive"] }
tokio = { workspace = true, optional = false, features = ["sync"] }
log = { workspace = true }
typed-builder = { workspace = true }
typetag = { workspace = true }
url = { workspace = true }
Expand Down
8 changes: 7 additions & 1 deletion crates/iceberg/src/transaction/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,13 @@ impl TransactionAction for FastAppendAction {

// Checks duplicate files
if self.check_duplicate {
snapshot_producer.validate_duplicate_files().await?;
let start = std::time::Instant::now();
let result = snapshot_producer.validate_duplicate_files().await;
log::info!(
"[FastAppendAction] validate_duplicate_files took {:?}",
start.elapsed()
);
result?;
}

snapshot_producer
Expand Down
69 changes: 63 additions & 6 deletions crates/iceberg/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ impl Transaction {
let backoff = Self::build_backoff(table_props)?;
let tx = self;

let table_ident = tx.table.identifier().clone();

log::info!("Starting transaction commit for {table_ident} with backoff {backoff:?}");
(|mut tx: Transaction| async {
let result = tx.do_commit(catalog).await;
(tx, result)
Expand All @@ -183,6 +186,11 @@ impl Transaction {
.sleep(tokio::time::sleep)
.context(tx)
.when(|e| e.retryable())
.notify(move |err, dur| {
log::warn!(
"Transaction commit for {table_ident} failed with retryable error, retrying in {dur:?}: {err}"
);
})
.await
.1
}
Expand All @@ -200,37 +208,86 @@ impl Transaction {
}

async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result<Table> {
log::info!(
"[do_commit] entering do_commit for table {:?}",
self.table.identifier()
);

log::info!("[do_commit] calling catalog.load_table...");
let refreshed = catalog.load_table(self.table.identifier()).await?;
log::info!("[do_commit] catalog.load_table returned successfully");

if self.table.metadata() != refreshed.metadata()
|| self.table.metadata_location() != refreshed.metadata_location()
{
// current base is stale, use refreshed as base and re-apply transaction actions
log::info!("[do_commit] base is stale, refreshing table");
self.table = refreshed.clone();
} else {
log::info!("[do_commit] base is up to date, no refresh needed");
}

let mut current_table = self.table.clone();
let mut existing_updates: Vec<TableUpdate> = vec![];
let mut existing_requirements: Vec<TableRequirement> = vec![];

for action in &self.actions {
log::info!("[do_commit] processing {} actions", self.actions.len());
for (i, action) in self.actions.iter().enumerate() {
log::info!(
"[do_commit] committing action {}/{}...",
i + 1,
self.actions.len()
);
let action_commit = Arc::clone(action).commit(&current_table).await?;
// apply action commit to current_table
current_table = Self::apply(
log::info!(
"[do_commit] action {}/{} commit returned, applying...",
i + 1,
self.actions.len()
);
let apply_result = Self::apply(
current_table,
action_commit,
&mut existing_updates,
&mut existing_requirements,
)?;
);
match apply_result {
Ok(table) => {
log::info!(
"[do_commit] action {}/{} applied successfully",
i + 1,
self.actions.len()
);
current_table = table;
}
Err(e) => {
log::warn!(
"[do_commit] action {}/{} apply failed: {}",
i + 1,
self.actions.len(),
e
);
return Err(e);
}
}
}

log::info!(
"[do_commit] building TableCommit with {} updates and {} requirements",
existing_updates.len(),
existing_requirements.len()
);
let table_commit = TableCommit::builder()
.ident(self.table.identifier().to_owned())
.updates(existing_updates)
.requirements(existing_requirements)
.build();

catalog.update_table(table_commit).await
log::info!("[do_commit] calling catalog.update_table...");
let result = catalog.update_table(table_commit).await;
log::info!(
"[do_commit] catalog.update_table returned with success={}",
result.is_ok()
);
result
}
}

Expand Down
Loading