From 9e28c17d410f46417a8d1aabdee3600c260bfd6e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 22 Jun 2025 09:03:21 -0400 Subject: [PATCH 01/12] Revert "Temporarily fix bug in dynamic top-k optimization (#16465)" This reverts commit 5ca4ff02932eecdd203b1b90acaf4381c0d5cb5c. --- .../core/tests/fuzz_cases/sort_query_fuzz.rs | 38 -------------- datafusion/physical-plan/src/topk/mod.rs | 51 +++++++++++++++++-- 2 files changed, 46 insertions(+), 43 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs index 1f47412caf2a..d2d3a5e0c22f 100644 --- a/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs @@ -31,7 +31,6 @@ use datafusion_execution::memory_pool::{ }; use datafusion_expr::display_schema; use datafusion_physical_plan::spill::get_record_batch_memory_size; -use itertools::Itertools; use std::time::Duration; use datafusion_execution::{memory_pool::FairSpillPool, runtime_env::RuntimeEnvBuilder}; @@ -73,43 +72,6 @@ async fn sort_query_fuzzer_runner() { fuzzer.run().await.unwrap(); } -/// Reproduce the bug with specific seeds from the -/// [failing test case](https://github.com/apache/datafusion/issues/16452). -#[tokio::test(flavor = "multi_thread")] -async fn test_reproduce_sort_query_issue_16452() { - // Seeds from the failing test case - let init_seed = 10313160656544581998u64; - let query_seed = 15004039071976572201u64; - let config_seed_1 = 11807432710583113300u64; - let config_seed_2 = 759937414670321802u64; - - let random_seed = 1u64; // Use a fixed seed to ensure consistent behavior - - let mut test_generator = SortFuzzerTestGenerator::new( - 2000, - 3, - "sort_fuzz_table".to_string(), - get_supported_types_columns(random_seed), - false, - random_seed, - ); - - let mut results = vec![]; - - for config_seed in [config_seed_1, config_seed_2] { - let r = test_generator - .fuzzer_run(init_seed, query_seed, config_seed) - .await - .unwrap(); - - results.push(r); - } - - for (lhs, rhs) in results.iter().tuple_windows() { - check_equality_of_batches(lhs, rhs).unwrap(); - } -} - /// SortQueryFuzzer holds the runner configuration for executing sort query fuzz tests. The fuzzing details are managed inside `SortFuzzerTestGenerator`. /// /// It defines: diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index 71029662f5f5..8d06fa73ce8e 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -18,8 +18,8 @@ //! TopK: Combination of Sort / LIMIT use arrow::{ - array::Array, - compute::interleave_record_batch, + array::{Array, AsArray}, + compute::{interleave_record_batch, prep_null_mask_filter, FilterBuilder}, row::{RowConverter, Rows, SortField}, }; use datafusion_expr::{ColumnarValue, Operator}; @@ -203,7 +203,7 @@ impl TopK { let baseline = self.metrics.baseline.clone(); let _timer = baseline.elapsed_compute().timer(); - let sort_keys: Vec = self + let mut sort_keys: Vec = self .expr .iter() .map(|expr| { @@ -212,6 +212,43 @@ impl TopK { }) .collect::>>()?; + let mut selected_rows = None; + + if let Some(filter) = self.filter.as_ref() { + // If a filter is provided, update it with the new rows + let filter = filter.current()?; + let filtered = filter.evaluate(&batch)?; + let num_rows = batch.num_rows(); + let array = filtered.into_array(num_rows)?; + let mut filter = array.as_boolean().clone(); + let true_count = filter.true_count(); + if true_count == 0 { + // nothing to filter, so no need to update + return Ok(()); + } + // only update the keys / rows if the filter does not match all rows + if true_count < num_rows { + // Indices in `set_indices` should be correct if filter contains nulls + // So we prepare the filter here. Note this is also done in the `FilterBuilder` + // so there is no overhead to do this here. + if filter.nulls().is_some() { + filter = prep_null_mask_filter(&filter); + } + + let filter_predicate = FilterBuilder::new(&filter); + let filter_predicate = if sort_keys.len() > 1 { + // Optimize filter when it has multiple sort keys + filter_predicate.optimize().build() + } else { + filter_predicate.build() + }; + selected_rows = Some(filter); + sort_keys = sort_keys + .iter() + .map(|key| filter_predicate.filter(key).map_err(|x| x.into())) + .collect::>>()?; + } + }; // reuse existing `Rows` to avoid reallocations let rows = &mut self.scratch_rows; rows.clear(); @@ -219,8 +256,12 @@ impl TopK { let mut batch_entry = self.heap.register_batch(batch.clone()); - let replacements = - self.find_new_topk_items(0..sort_keys[0].len(), &mut batch_entry); + let replacements = match selected_rows { + Some(filter) => { + self.find_new_topk_items(filter.values().set_indices(), &mut batch_entry) + } + None => self.find_new_topk_items(0..sort_keys[0].len(), &mut batch_entry), + }; if replacements > 0 { self.metrics.row_replacements.add(replacements); From 02b6fad4fa9ec66a902d8ecdc9f1ec3acd0b6da3 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 22 Jun 2025 09:04:21 -0400 Subject: [PATCH 02/12] restore --- .../core/tests/fuzz_cases/sort_query_fuzz.rs | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs index d2d3a5e0c22f..1f47412caf2a 100644 --- a/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs @@ -31,6 +31,7 @@ use datafusion_execution::memory_pool::{ }; use datafusion_expr::display_schema; use datafusion_physical_plan::spill::get_record_batch_memory_size; +use itertools::Itertools; use std::time::Duration; use datafusion_execution::{memory_pool::FairSpillPool, runtime_env::RuntimeEnvBuilder}; @@ -72,6 +73,43 @@ async fn sort_query_fuzzer_runner() { fuzzer.run().await.unwrap(); } +/// Reproduce the bug with specific seeds from the +/// [failing test case](https://github.com/apache/datafusion/issues/16452). +#[tokio::test(flavor = "multi_thread")] +async fn test_reproduce_sort_query_issue_16452() { + // Seeds from the failing test case + let init_seed = 10313160656544581998u64; + let query_seed = 15004039071976572201u64; + let config_seed_1 = 11807432710583113300u64; + let config_seed_2 = 759937414670321802u64; + + let random_seed = 1u64; // Use a fixed seed to ensure consistent behavior + + let mut test_generator = SortFuzzerTestGenerator::new( + 2000, + 3, + "sort_fuzz_table".to_string(), + get_supported_types_columns(random_seed), + false, + random_seed, + ); + + let mut results = vec![]; + + for config_seed in [config_seed_1, config_seed_2] { + let r = test_generator + .fuzzer_run(init_seed, query_seed, config_seed) + .await + .unwrap(); + + results.push(r); + } + + for (lhs, rhs) in results.iter().tuple_windows() { + check_equality_of_batches(lhs, rhs).unwrap(); + } +} + /// SortQueryFuzzer holds the runner configuration for executing sort query fuzz tests. The fuzzing details are managed inside `SortFuzzerTestGenerator`. /// /// It defines: From 0dda78b9e4080c1edd7e4eb685371f2ee97118ce Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 29 Jun 2025 09:03:10 -0500 Subject: [PATCH 03/12] tweak test runner to allow more partitions --- .../core/tests/fuzz_cases/record_batch_generator.rs | 9 ++++++++- datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs | 9 ++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/record_batch_generator.rs b/datafusion/core/tests/fuzz_cases/record_batch_generator.rs index 4eac1482ad3f..437559add340 100644 --- a/datafusion/core/tests/fuzz_cases/record_batch_generator.rs +++ b/datafusion/core/tests/fuzz_cases/record_batch_generator.rs @@ -34,6 +34,7 @@ use arrow_schema::{ DECIMAL256_MAX_SCALE, }; use datafusion_common::{arrow_datafusion_err, DataFusionError, Result}; +use itertools::Itertools; use rand::{rng, rngs::StdRng, Rng, SeedableRng}; use test_utils::array_gen::{ BinaryArrayGenerator, BooleanArrayGenerator, DecimalArrayGenerator, @@ -283,7 +284,13 @@ impl RecordBatchGenerator { } pub fn generate(&mut self) -> Result { - let num_rows = self.rng.random_range(self.min_rows_num..=self.max_rows_num); + let range = self.min_rows_num..=self.max_rows_num; + let num_rows = if range.try_len().unwrap() > 0 { + self.rng.random_range(range) + } else { + // If the range is empty, we return self.min_rows_num + self.min_rows_num + }; let array_gen_rng = StdRng::from_seed(self.rng.random()); let mut batch_gen_rng = StdRng::from_seed(self.rng.random()); let columns = self.columns.clone(); diff --git a/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs index 1f47412caf2a..7e14274db2b3 100644 --- a/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs @@ -75,19 +75,18 @@ async fn sort_query_fuzzer_runner() { /// Reproduce the bug with specific seeds from the /// [failing test case](https://github.com/apache/datafusion/issues/16452). -#[tokio::test(flavor = "multi_thread")] +#[tokio::test(flavor = "multi_thread", worker_threads = 128)] async fn test_reproduce_sort_query_issue_16452() { // Seeds from the failing test case let init_seed = 10313160656544581998u64; let query_seed = 15004039071976572201u64; let config_seed_1 = 11807432710583113300u64; - let config_seed_2 = 759937414670321802u64; let random_seed = 1u64; // Use a fixed seed to ensure consistent behavior let mut test_generator = SortFuzzerTestGenerator::new( 2000, - 3, + 128, "sort_fuzz_table".to_string(), get_supported_types_columns(random_seed), false, @@ -96,7 +95,7 @@ async fn test_reproduce_sort_query_issue_16452() { let mut results = vec![]; - for config_seed in [config_seed_1, config_seed_2] { + for config_seed in [config_seed_1, config_seed_1] { let r = test_generator .fuzzer_run(init_seed, query_seed, config_seed) .await @@ -572,7 +571,7 @@ impl SortFuzzerTestGenerator { let config = SessionConfig::new() .with_target_partitions(num_partitions) - .with_batch_size(init_state.approx_batch_num_rows / 2) + .with_batch_size(std::cmp::max(init_state.approx_batch_num_rows / 2, 1)) .with_sort_spill_reservation_bytes(sort_spill_reservation_bytes) .with_sort_in_place_threshold_bytes(sort_in_place_threshold_bytes); From b990214b2093905d2ed3c4b3560c683ff7b8a174 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 29 Jun 2025 09:04:53 -0500 Subject: [PATCH 04/12] simplify test --- datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs index 7e14274db2b3..45e22af3d83e 100644 --- a/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs @@ -21,7 +21,7 @@ use std::cmp::min; use std::sync::Arc; use arrow::array::RecordBatch; -use arrow_schema::SchemaRef; +use arrow_schema::{DataType, IntervalUnit, SchemaRef}; use datafusion::datasource::MemTable; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_common::{instant::Instant, Result}; @@ -80,7 +80,7 @@ async fn test_reproduce_sort_query_issue_16452() { // Seeds from the failing test case let init_seed = 10313160656544581998u64; let query_seed = 15004039071976572201u64; - let config_seed_1 = 11807432710583113300u64; + let config_seed_1 = 1; let random_seed = 1u64; // Use a fixed seed to ensure consistent behavior @@ -88,7 +88,13 @@ async fn test_reproduce_sort_query_issue_16452() { 2000, 128, "sort_fuzz_table".to_string(), - get_supported_types_columns(random_seed), + vec![ + ColumnDescr::new("u64", DataType::UInt64), + ColumnDescr::new( + "interval_month_day_nano", + DataType::Interval(IntervalUnit::MonthDayNano), + ), + ], false, random_seed, ); From cd758b170b63de59bf6b1fc1cd291d8db01e87f2 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 29 Jun 2025 09:24:21 -0500 Subject: [PATCH 05/12] simplify test --- .../core/tests/fuzz_cases/sort_query_fuzz.rs | 56 ++++++++++++------- 1 file changed, 37 insertions(+), 19 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs index 45e22af3d83e..04db94b5d383 100644 --- a/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs @@ -31,7 +31,6 @@ use datafusion_execution::memory_pool::{ }; use datafusion_expr::display_schema; use datafusion_physical_plan::spill::get_record_batch_memory_size; -use itertools::Itertools; use std::time::Duration; use datafusion_execution::{memory_pool::FairSpillPool, runtime_env::RuntimeEnvBuilder}; @@ -79,13 +78,9 @@ async fn sort_query_fuzzer_runner() { async fn test_reproduce_sort_query_issue_16452() { // Seeds from the failing test case let init_seed = 10313160656544581998u64; - let query_seed = 15004039071976572201u64; - let config_seed_1 = 1; - let random_seed = 1u64; // Use a fixed seed to ensure consistent behavior - let mut test_generator = SortFuzzerTestGenerator::new( - 2000, + 512, 128, "sort_fuzz_table".to_string(), vec![ @@ -99,19 +94,42 @@ async fn test_reproduce_sort_query_issue_16452() { random_seed, ); - let mut results = vec![]; - - for config_seed in [config_seed_1, config_seed_1] { - let r = test_generator - .fuzzer_run(init_seed, query_seed, config_seed) - .await - .unwrap(); - - results.push(r); - } - - for (lhs, rhs) in results.iter().tuple_windows() { - check_equality_of_batches(lhs, rhs).unwrap(); + test_generator.init_partitioned_staggered_batches(init_seed); + let schema = test_generator + .dataset_state + .as_ref() + .unwrap() + .schema + .clone(); + let data = test_generator + .dataset_state + .as_ref() + .unwrap() + .partitioned_staggered_batches + .clone(); + + let query = "SELECT * FROM sort_fuzz_table ORDER BY interval_month_day_nano NULLS FIRST LIMIT 1"; + let config = SessionConfig::new() + .with_target_partitions(128) + .with_batch_size(1); + let ctx = SessionContext::new_with_config(config); + let provider = Arc::new(MemTable::try_new(schema.clone(), data.clone()).unwrap()); + ctx.register_table("sort_fuzz_table", provider).unwrap(); + + let mut previous_results = None; + for _ in 0..1024 { + let r = ctx.sql(query).await.unwrap().collect().await.unwrap(); + match &mut previous_results { + None => { + // Store the first run as the expected result + previous_results = Some(r.clone()); + } + Some(prev) => { + // Check that the results are consistent with the previous run + check_equality_of_batches(prev, &r).unwrap(); + *prev = r; // Update the previous results + } + } } } From 694f455d05368e2271daedaca610127c9d4fc984 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 29 Jun 2025 09:32:35 -0500 Subject: [PATCH 06/12] simplify test --- .../core/tests/fuzz_cases/sort_query_fuzz.rs | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs index 04db94b5d383..8675b048816e 100644 --- a/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs @@ -21,7 +21,8 @@ use std::cmp::min; use std::sync::Arc; use arrow::array::RecordBatch; -use arrow_schema::{DataType, IntervalUnit, SchemaRef}; +use arrow::util::pretty::pretty_format_batches; +use arrow_schema::{DataType, SchemaRef}; use datafusion::datasource::MemTable; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_common::{instant::Instant, Result}; @@ -78,16 +79,16 @@ async fn sort_query_fuzzer_runner() { async fn test_reproduce_sort_query_issue_16452() { // Seeds from the failing test case let init_seed = 10313160656544581998u64; - let random_seed = 1u64; // Use a fixed seed to ensure consistent behavior + let random_seed = 1u64; let mut test_generator = SortFuzzerTestGenerator::new( - 512, + 256, 128, "sort_fuzz_table".to_string(), vec![ ColumnDescr::new("u64", DataType::UInt64), ColumnDescr::new( - "interval_month_day_nano", - DataType::Interval(IntervalUnit::MonthDayNano), + "u32", + DataType::UInt32, ), ], false, @@ -108,7 +109,13 @@ async fn test_reproduce_sort_query_issue_16452() { .partitioned_staggered_batches .clone(); - let query = "SELECT * FROM sort_fuzz_table ORDER BY interval_month_day_nano NULLS FIRST LIMIT 1"; + let flat_data = data + .iter() + .flat_map(|partition| partition.iter().cloned()) + .collect::>(); + println!("data: {}", pretty_format_batches(&flat_data).unwrap()); + + let query = "SELECT * FROM sort_fuzz_table ORDER BY u32 NULLS FIRST LIMIT 1"; let config = SessionConfig::new() .with_target_partitions(128) .with_batch_size(1); @@ -117,7 +124,8 @@ async fn test_reproduce_sort_query_issue_16452() { ctx.register_table("sort_fuzz_table", provider).unwrap(); let mut previous_results = None; - for _ in 0..1024 { + for iteration in 0..2056 { + println!("Iteration {iteration}"); let r = ctx.sql(query).await.unwrap().collect().await.unwrap(); match &mut previous_results { None => { From b9e1508425a987da26326c9bf193f60896894362 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 29 Jun 2025 09:35:10 -0500 Subject: [PATCH 07/12] fmt --- datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs index 8675b048816e..c6b863e3f9b9 100644 --- a/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs @@ -86,10 +86,7 @@ async fn test_reproduce_sort_query_issue_16452() { "sort_fuzz_table".to_string(), vec![ ColumnDescr::new("u64", DataType::UInt64), - ColumnDescr::new( - "u32", - DataType::UInt32, - ), + ColumnDescr::new("u32", DataType::UInt32), ], false, random_seed, From 97601941f44fbbf8b0255c0cadc5b60cc248bc83 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 29 Jun 2025 10:05:21 -0500 Subject: [PATCH 08/12] simplify test even more --- .../core/tests/fuzz_cases/sort_query_fuzz.rs | 52 +++++-------------- 1 file changed, 14 insertions(+), 38 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs index c6b863e3f9b9..530a6476a27e 100644 --- a/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs @@ -20,8 +20,7 @@ use std::cmp::min; use std::sync::Arc; -use arrow::array::RecordBatch; -use arrow::util::pretty::pretty_format_batches; +use arrow::array::{record_batch, RecordBatch}; use arrow_schema::{DataType, SchemaRef}; use datafusion::datasource::MemTable; use datafusion::prelude::{SessionConfig, SessionContext}; @@ -75,53 +74,30 @@ async fn sort_query_fuzzer_runner() { /// Reproduce the bug with specific seeds from the /// [failing test case](https://github.com/apache/datafusion/issues/16452). -#[tokio::test(flavor = "multi_thread", worker_threads = 128)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_reproduce_sort_query_issue_16452() { - // Seeds from the failing test case - let init_seed = 10313160656544581998u64; - let random_seed = 1u64; - let mut test_generator = SortFuzzerTestGenerator::new( - 256, - 128, - "sort_fuzz_table".to_string(), - vec![ - ColumnDescr::new("u64", DataType::UInt64), - ColumnDescr::new("u32", DataType::UInt32), - ], - false, - random_seed, - ); - - test_generator.init_partitioned_staggered_batches(init_seed); - let schema = test_generator - .dataset_state - .as_ref() - .unwrap() - .schema - .clone(); - let data = test_generator - .dataset_state - .as_ref() - .unwrap() - .partitioned_staggered_batches - .clone(); + let schema = Arc::new(arrow_schema::Schema::new(vec![ + arrow_schema::Field::new("u64", DataType::UInt64, true), + arrow_schema::Field::new("u32", DataType::UInt32, true), + ])); - let flat_data = data - .iter() - .flat_map(|partition| partition.iter().cloned()) - .collect::>(); - println!("data: {}", pretty_format_batches(&flat_data).unwrap()); + // build the data manually to reproduce the bug + let data = vec![ + vec![record_batch!(("u64", UInt64, [1]), ("u32", UInt32, [2])).unwrap()], + vec![record_batch!(("u64", UInt64, [2]), ("u32", UInt32, [2])).unwrap()], + ]; let query = "SELECT * FROM sort_fuzz_table ORDER BY u32 NULLS FIRST LIMIT 1"; let config = SessionConfig::new() - .with_target_partitions(128) + .with_target_partitions(2) .with_batch_size(1); let ctx = SessionContext::new_with_config(config); let provider = Arc::new(MemTable::try_new(schema.clone(), data.clone()).unwrap()); ctx.register_table("sort_fuzz_table", provider).unwrap(); + // Failure usually happens afer ~500 iterations, add a generous number of runs to make sure it reproduces let mut previous_results = None; - for iteration in 0..2056 { + for iteration in 0..4096 { println!("Iteration {iteration}"); let r = ctx.sql(query).await.unwrap().collect().await.unwrap(); match &mut previous_results { From da2aefb51429174bf113231461c619df07c7ef2d Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 29 Jun 2025 10:11:01 -0500 Subject: [PATCH 09/12] Revert test runner changes --- .../core/tests/fuzz_cases/record_batch_generator.rs | 9 +-------- datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs | 2 +- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/record_batch_generator.rs b/datafusion/core/tests/fuzz_cases/record_batch_generator.rs index 437559add340..4eac1482ad3f 100644 --- a/datafusion/core/tests/fuzz_cases/record_batch_generator.rs +++ b/datafusion/core/tests/fuzz_cases/record_batch_generator.rs @@ -34,7 +34,6 @@ use arrow_schema::{ DECIMAL256_MAX_SCALE, }; use datafusion_common::{arrow_datafusion_err, DataFusionError, Result}; -use itertools::Itertools; use rand::{rng, rngs::StdRng, Rng, SeedableRng}; use test_utils::array_gen::{ BinaryArrayGenerator, BooleanArrayGenerator, DecimalArrayGenerator, @@ -284,13 +283,7 @@ impl RecordBatchGenerator { } pub fn generate(&mut self) -> Result { - let range = self.min_rows_num..=self.max_rows_num; - let num_rows = if range.try_len().unwrap() > 0 { - self.rng.random_range(range) - } else { - // If the range is empty, we return self.min_rows_num - self.min_rows_num - }; + let num_rows = self.rng.random_range(self.min_rows_num..=self.max_rows_num); let array_gen_rng = StdRng::from_seed(self.rng.random()); let mut batch_gen_rng = StdRng::from_seed(self.rng.random()); let columns = self.columns.clone(); diff --git a/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs index 530a6476a27e..4d9854c1570a 100644 --- a/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs @@ -576,7 +576,7 @@ impl SortFuzzerTestGenerator { let config = SessionConfig::new() .with_target_partitions(num_partitions) - .with_batch_size(std::cmp::max(init_state.approx_batch_num_rows / 2, 1)) + .with_batch_size(init_state.approx_batch_num_rows / 2) .with_sort_spill_reservation_bytes(sort_spill_reservation_bytes) .with_sort_in_place_threshold_bytes(sort_in_place_threshold_bytes); From cbf5d09b65ef5b9c5a05cc8bd4df534b7eb51a5b Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 29 Jun 2025 10:21:26 -0500 Subject: [PATCH 10/12] remove NULLS FIRST --- datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs index 4d9854c1570a..c95d00340e71 100644 --- a/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs @@ -87,7 +87,7 @@ async fn test_reproduce_sort_query_issue_16452() { vec![record_batch!(("u64", UInt64, [2]), ("u32", UInt32, [2])).unwrap()], ]; - let query = "SELECT * FROM sort_fuzz_table ORDER BY u32 NULLS FIRST LIMIT 1"; + let query = "SELECT * FROM sort_fuzz_table ORDER BY u32 LIMIT 1"; let config = SessionConfig::new() .with_target_partitions(2) .with_batch_size(1); From 514ab74e0d1cefd036c0148a83d1637aea1b8bda Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 1 Jul 2025 10:51:40 -0500 Subject: [PATCH 11/12] only pull out selected columns --- .../core/tests/fuzz_cases/sort_query_fuzz.rs | 54 +++---------------- 1 file changed, 8 insertions(+), 46 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs index c95d00340e71..e21ede456889 100644 --- a/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs @@ -20,8 +20,8 @@ use std::cmp::min; use std::sync::Arc; -use arrow::array::{record_batch, RecordBatch}; -use arrow_schema::{DataType, SchemaRef}; +use arrow::array::RecordBatch; +use arrow_schema::SchemaRef; use datafusion::datasource::MemTable; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_common::{instant::Instant, Result}; @@ -72,48 +72,6 @@ async fn sort_query_fuzzer_runner() { fuzzer.run().await.unwrap(); } -/// Reproduce the bug with specific seeds from the -/// [failing test case](https://github.com/apache/datafusion/issues/16452). -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn test_reproduce_sort_query_issue_16452() { - let schema = Arc::new(arrow_schema::Schema::new(vec![ - arrow_schema::Field::new("u64", DataType::UInt64, true), - arrow_schema::Field::new("u32", DataType::UInt32, true), - ])); - - // build the data manually to reproduce the bug - let data = vec![ - vec![record_batch!(("u64", UInt64, [1]), ("u32", UInt32, [2])).unwrap()], - vec![record_batch!(("u64", UInt64, [2]), ("u32", UInt32, [2])).unwrap()], - ]; - - let query = "SELECT * FROM sort_fuzz_table ORDER BY u32 LIMIT 1"; - let config = SessionConfig::new() - .with_target_partitions(2) - .with_batch_size(1); - let ctx = SessionContext::new_with_config(config); - let provider = Arc::new(MemTable::try_new(schema.clone(), data.clone()).unwrap()); - ctx.register_table("sort_fuzz_table", provider).unwrap(); - - // Failure usually happens afer ~500 iterations, add a generous number of runs to make sure it reproduces - let mut previous_results = None; - for iteration in 0..4096 { - println!("Iteration {iteration}"); - let r = ctx.sql(query).await.unwrap().collect().await.unwrap(); - match &mut previous_results { - None => { - // Store the first run as the expected result - previous_results = Some(r.clone()); - } - Some(prev) => { - // Check that the results are consistent with the previous run - check_equality_of_batches(prev, &r).unwrap(); - *prev = r; // Update the previous results - } - } - } -} - /// SortQueryFuzzer holds the runner configuration for executing sort query fuzz tests. The fuzzing details are managed inside `SortFuzzerTestGenerator`. /// /// It defines: @@ -470,7 +428,7 @@ impl SortFuzzerTestGenerator { .collect(); let mut order_by_clauses = Vec::new(); - for col in selected_columns { + for col in &selected_columns { let mut clause = col.name.clone(); if rng.random_bool(0.5) { let order = if rng.random_bool(0.5) { "ASC" } else { "DESC" }; @@ -505,7 +463,11 @@ impl SortFuzzerTestGenerator { let limit_clause = limit.map_or(String::new(), |l| format!(" LIMIT {l}")); let query = format!( - "SELECT * FROM {} ORDER BY {}{}", + "SELECT {} FROM {} ORDER BY {}{}", + selected_columns.iter() + .map(|col| col.name.clone()) + .collect::>() + .join(", "), self.table_name, order_by_clauses.join(", "), limit_clause From 423716a470145bd7ebc5f38b164b185e57d4db79 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 1 Jul 2025 10:54:23 -0500 Subject: [PATCH 12/12] fmt --- datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs index e21ede456889..4cf6609fd441 100644 --- a/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs @@ -464,7 +464,8 @@ impl SortFuzzerTestGenerator { let query = format!( "SELECT {} FROM {} ORDER BY {}{}", - selected_columns.iter() + selected_columns + .iter() .map(|col| col.name.clone()) .collect::>() .join(", "),