From f5428a8208e1f7f7e9b0fdcc97de3b398ef187f7 Mon Sep 17 00:00:00 2001 From: Dov Alperin Date: Wed, 3 Sep 2025 17:56:44 -0400 Subject: [PATCH 1/2] Persist: only use PartialBatch CompactionInput when really required If we are replacing a single batch, with a single run, we can just replace the whole thing. No need to take the (slightly) more expensive partial batch replacement codepath. --- src/persist-client/src/internal/compact.rs | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/persist-client/src/internal/compact.rs b/src/persist-client/src/internal/compact.rs index a7dabec40c099..40a0f5261ac56 100644 --- a/src/persist-client/src/internal/compact.rs +++ b/src/persist-client/src/internal/compact.rs @@ -695,10 +695,21 @@ where .collect::>(); match batch_ids.iter().exactly_one().ok() { Some(batch_id) => { - CompactionInput::PartialBatch( - *batch_id, - run_ids - ) + // We are compacting runs from exactly one batch. Decide whether this + // chunk represents the entire batch or only a subset of its runs. + + let full_run_count = req.inputs + .iter() + .find(|x| x.id == *batch_id) + .map(|b| b.batch.run_meta.len()) + .expect("trying to replace with an ID not present in input!"); + + let covers_whole_batch = run_ids.len() == full_run_count; + if !covers_whole_batch { + CompactionInput::PartialBatch(*batch_id, run_ids) + } else { + input_id_range(batch_ids) + } } None => input_id_range(batch_ids), } From 91597c5ca0d9a085c92c7e39178229388a61bb3f Mon Sep 17 00:00:00 2001 From: Dov Alperin Date: Mon, 22 Sep 2025 00:08:37 -0400 Subject: [PATCH 2/2] Persist: eagerly compact empty batches during incremental compaction The limits nightly test seems to exhibit a pathological case wherein there are no updates, but the frontier moves forward frequently leading to _lots_ of empty batches. In theory we should handle this fine, but we don't (I'm actively investigating this). Anyways, this seems like a good balance to strike between the old compaction behavior and incremental compaction, while avoiding some of the empty batch downsides. --- src/persist-client/src/internal/compact.rs | 25 +++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/src/persist-client/src/internal/compact.rs b/src/persist-client/src/internal/compact.rs index a7dabec40c099..5bd541844a738 100644 --- a/src/persist-client/src/internal/compact.rs +++ b/src/persist-client/src/internal/compact.rs @@ -667,6 +667,21 @@ where ); let total_chunked_runs = chunked_runs.len(); + // Special-case: incremental compaction with exactly one real run but many empty + // batches (each with zero runs). Without this, we only rewrite the single batch + // containing the run and leave all the adjacent empty batches intact, causing + // unnecessary metadata bloat. If we detect this case, treat all the empty batches + // as part of the compaction input so they get removed. This preserves correctness + // because empty batches contribute no updates. + // + // This matters for the pathological case where there are no updates, but many + // empty batches due to frequent frontier advancements. + let single_run_all_inputs = incremental_enabled && ordered_runs.len() == 1; + + let all_input_batch_ids: BTreeSet<_> = req.inputs.iter().map(|x| x.id).collect(); + + let all_descs: Vec<_> = req.inputs.iter().map(|x| &x.batch.desc).collect(); + for (applied, (runs, run_chunk_max_memory_usage)) in chunked_runs.into_iter().enumerate() { @@ -685,10 +700,18 @@ where let mut run_cfg = cfg.clone(); run_cfg.batch.batch_builder_max_outstanding_parts = 1 + extra_outstanding_parts; - let (batch_ids, descriptions): (BTreeSet<_>, Vec<_>) = runs.iter() + + let (mut batch_ids, mut descriptions): (BTreeSet<_>, Vec<_>) = runs.iter() .map(|(run_id, desc, _, _)| (run_id.0, *desc)) .unzip(); + if single_run_all_inputs { + // Extend to include all (empty) batch ids and their descriptions. + // (Descriptions of empties are needed below to compute the merged desc.) + batch_ids.extend(all_input_batch_ids.iter().copied()); + descriptions.extend(all_descs.iter().copied()); + } + let input = if incremental_enabled { let run_ids = runs.iter() .map(|(run_id, _, _, _)| run_id.1.expect("run_id should be present"))