Skip to content

Commit f915110

Browse files
committed
Fuzzer-based reproduction and a temporary fix for #16452
1 parent 2d7ae09 commit f915110

File tree

2 files changed

+45
-46
lines changed

2 files changed

+45
-46
lines changed

datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use datafusion_execution::memory_pool::{
3131
};
3232
use datafusion_expr::display_schema;
3333
use datafusion_physical_plan::spill::get_record_batch_memory_size;
34+
use itertools::Itertools;
3435
use std::time::Duration;
3536

3637
use datafusion_execution::{memory_pool::FairSpillPool, runtime_env::RuntimeEnvBuilder};
@@ -72,6 +73,44 @@ async fn sort_query_fuzzer_runner() {
7273
fuzzer.run().await.unwrap();
7374
}
7475

76+
/// Reproduce the bug with specific seeds from the [
77+
/// failing test case](https://github.com/apache/datafusion/issues/16452).
78+
#[tokio::test(flavor = "multi_thread")]
79+
async fn test_reproduce_sort_query_issue_16452() {
80+
// Seeds from the failing test case
81+
let init_seed = 10313160656544581998u64;
82+
let query_seed = 15004039071976572201u64;
83+
let config_seed_1 = 11807432710583113300u64;
84+
let config_seed_2 = 759937414670321802u64;
85+
86+
let random_seed = 1u64; // Use a fixed seed to ensure consistent behavior
87+
88+
println!("Creating test generator with same config as original runner...");
89+
let mut test_generator = SortFuzzerTestGenerator::new(
90+
2000,
91+
3,
92+
"sort_fuzz_table".to_string(),
93+
get_supported_types_columns(random_seed),
94+
false,
95+
random_seed,
96+
);
97+
98+
let mut results = vec![];
99+
100+
for config_seed in [config_seed_1, config_seed_2] {
101+
let r = test_generator
102+
.fuzzer_run(init_seed, query_seed, config_seed)
103+
.await
104+
.unwrap();
105+
106+
results.push(r);
107+
}
108+
109+
for (lhs, rhs) in results.iter().tuple_windows() {
110+
check_equality_of_batches(lhs, rhs).unwrap();
111+
}
112+
}
113+
75114
/// SortQueryFuzzer holds the runner configuration for executing sort query fuzz tests. The fuzzing details are managed inside `SortFuzzerTestGenerator`.
76115
///
77116
/// It defines:
@@ -579,6 +618,7 @@ impl SortFuzzerTestGenerator {
579618
let with_mem_limit = !query_str.contains("LIMIT") && self.set_memory_limit;
580619

581620
let ctx = self.generate_random_config(config_seed, with_mem_limit)?;
621+
582622
let df = ctx.sql(&query_str).await.unwrap();
583623
let results = df.collect().await.unwrap();
584624

datafusion/physical-plan/src/topk/mod.rs

Lines changed: 5 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
//! TopK: Combination of Sort / LIMIT
1919
2020
use arrow::{
21-
array::{Array, AsArray},
22-
compute::{interleave_record_batch, prep_null_mask_filter, FilterBuilder},
21+
array::Array,
22+
compute::interleave_record_batch,
2323
row::{RowConverter, Rows, SortField},
2424
};
2525
use datafusion_expr::{ColumnarValue, Operator};
@@ -203,7 +203,7 @@ impl TopK {
203203
let baseline = self.metrics.baseline.clone();
204204
let _timer = baseline.elapsed_compute().timer();
205205

206-
let mut sort_keys: Vec<ArrayRef> = self
206+
let sort_keys: Vec<ArrayRef> = self
207207
.expr
208208
.iter()
209209
.map(|expr| {
@@ -212,56 +212,15 @@ impl TopK {
212212
})
213213
.collect::<Result<Vec<_>>>()?;
214214

215-
let mut selected_rows = None;
216-
217-
if let Some(filter) = self.filter.as_ref() {
218-
// If a filter is provided, update it with the new rows
219-
let filter = filter.current()?;
220-
let filtered = filter.evaluate(&batch)?;
221-
let num_rows = batch.num_rows();
222-
let array = filtered.into_array(num_rows)?;
223-
let mut filter = array.as_boolean().clone();
224-
let true_count = filter.true_count();
225-
if true_count == 0 {
226-
// nothing to filter, so no need to update
227-
return Ok(());
228-
}
229-
// only update the keys / rows if the filter does not match all rows
230-
if true_count < num_rows {
231-
// Indices in `set_indices` should be correct if filter contains nulls
232-
// So we prepare the filter here. Note this is also done in the `FilterBuilder`
233-
// so there is no overhead to do this here.
234-
if filter.nulls().is_some() {
235-
filter = prep_null_mask_filter(&filter);
236-
}
237-
238-
let filter_predicate = FilterBuilder::new(&filter);
239-
let filter_predicate = if sort_keys.len() > 1 {
240-
// Optimize filter when it has multiple sort keys
241-
filter_predicate.optimize().build()
242-
} else {
243-
filter_predicate.build()
244-
};
245-
selected_rows = Some(filter);
246-
sort_keys = sort_keys
247-
.iter()
248-
.map(|key| filter_predicate.filter(key).map_err(|x| x.into()))
249-
.collect::<Result<Vec<_>>>()?;
250-
}
251-
};
252215
// reuse existing `Rows` to avoid reallocations
253216
let rows = &mut self.scratch_rows;
254217
rows.clear();
255218
self.row_converter.append(rows, &sort_keys)?;
256219

257220
let mut batch_entry = self.heap.register_batch(batch.clone());
258221

259-
let replacements = match selected_rows {
260-
Some(filter) => {
261-
self.find_new_topk_items(filter.values().set_indices(), &mut batch_entry)
262-
}
263-
None => self.find_new_topk_items(0..sort_keys[0].len(), &mut batch_entry),
264-
};
222+
let replacements =
223+
self.find_new_topk_items(0..sort_keys[0].len(), &mut batch_entry);
265224

266225
if replacements > 0 {
267226
self.metrics.row_replacements.add(replacements);

0 commit comments

Comments
 (0)