diff --git a/Cargo.lock b/Cargo.lock index 9564ca69ea2a..9828c4c87bdf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2457,6 +2457,7 @@ dependencies = [ "insta", "itertools 0.14.0", "log", + "parking_lot", "paste", "petgraph 0.8.2", "rand 0.9.2", diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 709fcc5c1fc2..694b14f6a5c2 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -409,12 +409,58 @@ impl FileOpener for ParquetOpener { .with_row_groups(row_group_indexes) .build()?; - let adapted = stream - .map_err(|e| ArrowError::ExternalError(Box::new(e))) - .map(move |maybe_batch| { - maybe_batch - .and_then(|b| schema_mapping.map_batch(b).map_err(Into::into)) - }); + // Create a stateful stream that can check pruning after each batch + let adapted = { + use futures::stream; + let schema_mapping = Some(schema_mapping); + let file_pruner = file_pruner; + let stream = stream.map_err(|e| ArrowError::ExternalError(Box::new(e))); + let files_ranges_pruned_statistics = + file_metrics.files_ranges_pruned_statistics.clone(); + + stream::try_unfold( + ( + stream, + schema_mapping, + file_pruner, + files_ranges_pruned_statistics, + ), + move |( + mut stream, + schema_mapping_opt, + mut file_pruner, + files_ranges_pruned_statistics, + )| async move { + match stream.try_next().await? { + Some(batch) => { + let schema_mapping = schema_mapping_opt.as_ref().unwrap(); + let mapped_batch = schema_mapping.map_batch(batch)?; + + // Check if we can prune the file now + if let Some(ref mut pruner) = file_pruner { + if pruner.should_prune()? { + // File can now be pruned based on updated dynamic filters + files_ranges_pruned_statistics.add(1); + // Terminate the stream early + return Ok(None); + } + } + + Ok(Some(( + mapped_batch, + ( + stream, + schema_mapping_opt, + file_pruner, + files_ranges_pruned_statistics, + ), + ))) + } + None => Ok(None), + } + }, + ) + }; Ok(adapted.boxed()) })) diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index 881969ef32ad..c0eaa3e20528 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -50,6 +50,7 @@ hashbrown = { workspace = true } indexmap = { workspace = true } itertools = { workspace = true, features = ["use_std"] } log = { workspace = true } +parking_lot = { workspace = true } paste = "^1.0" petgraph = "0.8.2" diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index d4b3180a6fc6..eeb0c6e8028f 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -15,12 +15,8 @@ // specific language governing permissions and limitations // under the License. -use std::{ - any::Any, - fmt::Display, - hash::Hash, - sync::{Arc, RwLock}, -}; +use parking_lot::RwLock; +use std::{any::Any, fmt::Display, hash::Hash, sync::Arc}; use crate::PhysicalExpr; use arrow::datatypes::{DataType, Schema}; @@ -72,6 +68,11 @@ impl Inner { expr, } } + + /// Clone the inner expression. + fn expr(&self) -> &Arc { + &self.expr + } } impl Hash for DynamicFilterPhysicalExpr { @@ -176,20 +177,8 @@ impl DynamicFilterPhysicalExpr { /// This will return the current expression with any children /// remapped to match calls to [`PhysicalExpr::with_new_children`]. pub fn current(&self) -> Result> { - let inner = Arc::clone( - &self - .inner - .read() - .map_err(|_| { - datafusion_common::DataFusionError::Execution( - "Failed to acquire read lock for inner".to_string(), - ) - })? - .expr, - ); - let inner = - Self::remap_children(&self.children, self.remapped_children.as_ref(), inner)?; - Ok(inner) + let expr = Arc::clone(self.inner.read().expr()); + Self::remap_children(&self.children, self.remapped_children.as_ref(), expr) } /// Update the current expression. @@ -199,11 +188,6 @@ impl DynamicFilterPhysicalExpr { /// - When we've computed the probe side's hash table in a HashJoinExec /// - After every batch is processed if we update the TopK heap in a SortExec using a TopK approach. pub fn update(&self, new_expr: Arc) -> Result<()> { - let mut current = self.inner.write().map_err(|_| { - datafusion_common::DataFusionError::Execution( - "Failed to acquire write lock for inner".to_string(), - ) - })?; // Remap the children of the new expression to match the original children // We still do this again in `current()` but doing it preventively here // reduces the work needed in some cases if `current()` is called multiple times @@ -213,10 +197,13 @@ impl DynamicFilterPhysicalExpr { self.remapped_children.as_ref(), new_expr, )?; - // Update the inner expression to the new expression. - current.expr = new_expr; - // Increment the generation to indicate that the expression has changed. - current.generation += 1; + + // Load the current inner, increment generation, and store the new one + let mut current = self.inner.write(); + *current = Inner { + generation: current.generation + 1, + expr: new_expr, + }; Ok(()) } } @@ -253,10 +240,8 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { { use datafusion_common::internal_err; // Check if the data type has changed. - let mut data_type_lock = self - .data_type - .write() - .expect("Failed to acquire write lock for data_type"); + let mut data_type_lock = self.data_type.write(); + if let Some(existing) = &*data_type_lock { if existing != &res { // If the data type has changed, we have a bug. @@ -278,10 +263,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { { use datafusion_common::internal_err; // Check if the nullability has changed. - let mut nullable_lock = self - .nullable - .write() - .expect("Failed to acquire write lock for nullable"); + let mut nullable_lock = self.nullable.write(); if let Some(existing) = *nullable_lock { if existing != res { // If the nullability has changed, we have a bug. @@ -324,10 +306,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { fn snapshot_generation(&self) -> u64 { // Return the current generation of the expression. - self.inner - .read() - .expect("Failed to acquire read lock for inner") - .generation + self.inner.read().generation } } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 0af7d8d9d57a..c026e430971a 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -24,6 +24,8 @@ use std::fmt; use std::fmt::{Debug, Formatter}; use std::sync::Arc; +use parking_lot::RwLock; + use crate::common::spawn_buffered; use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType}; use crate::expressions::PhysicalSortExpr; @@ -39,8 +41,10 @@ use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder}; use crate::spill::get_record_batch_memory_size; use crate::spill::in_progress_spill_file::InProgressSpillFile; use crate::spill::spill_manager::{GetSlicedSize, SpillManager}; -use crate::stream::{BatchSplitStream, RecordBatchStreamAdapter}; +use crate::stream::BatchSplitStream; +use crate::stream::RecordBatchStreamAdapter; use crate::topk::TopK; +use crate::topk::TopKDynamicFilters; use crate::{ DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream, @@ -51,7 +55,10 @@ use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray}; use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays}; use arrow::datatypes::SchemaRef; use datafusion_common::config::SpillCompression; -use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError, Result}; +use datafusion_common::{ + internal_datafusion_err, internal_err, unwrap_or_internal_err, DataFusionError, + Result, +}; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; @@ -887,8 +894,10 @@ pub struct SortExec { common_sort_prefix: Vec, /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, - /// Filter matching the state of the sort for dynamic filter pushdown - filter: Option>, + /// Filter matching the state of the sort for dynamic filter pushdown. + /// If `fetch` is `Some`, this will also be set and a TopK operator may be used. + /// If `fetch` is `None`, this will be `None`. + filter: Option>>, } impl SortExec { @@ -934,14 +943,16 @@ impl SortExec { self } - /// Add or reset `self.filter` to a new `DynamicFilterPhysicalExpr`. - fn create_filter(&self) -> Arc { + /// Add or reset `self.filter` to a new `TopKDynamicFilters`. + fn create_filter(&self) -> Arc> { let children = self .expr .iter() .map(|sort_expr| Arc::clone(&sort_expr.expr)) .collect::>(); - Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true))) + Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new( + DynamicFilterPhysicalExpr::new(children, lit(true)), + )))) } fn cloned(&self) -> Self { @@ -1080,7 +1091,7 @@ impl DisplayAs for SortExec { Some(fetch) => { write!(f, "SortExec: TopK(fetch={fetch}), expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr)?; if let Some(filter) = &self.filter { - if let Ok(current) = filter.current() { + if let Ok(current) = filter.read().expr().current() { if !current.eq(&lit(true)) { write!(f, ", filter=[{current}]")?; } @@ -1216,6 +1227,7 @@ impl ExecutionPlan for SortExec { ))), (true, None) => Ok(input), (false, Some(fetch)) => { + let filter = self.filter.clone(); let mut topk = TopK::try_new( partition, input.schema(), @@ -1225,7 +1237,7 @@ impl ExecutionPlan for SortExec { context.session_config().batch_size(), context.runtime_env(), &self.metrics_set, - self.filter.clone(), + Arc::clone(&unwrap_or_internal_err!(filter)), )?; Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), @@ -1349,8 +1361,7 @@ impl ExecutionPlan for SortExec { if let Some(filter) = &self.filter { if config.optimizer.enable_dynamic_filter_pushdown { - child = - child.with_self_filter(Arc::clone(filter) as Arc); + child = child.with_self_filter(filter.read().expr()); } } diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index 8d06fa73ce8e..6c0479a108e8 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -44,6 +44,7 @@ use datafusion_physical_expr::{ PhysicalExpr, }; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; +use parking_lot::RwLock; /// Global TopK /// @@ -121,13 +122,38 @@ pub struct TopK { /// Common sort prefix between the input and the sort expressions to allow early exit optimization common_sort_prefix: Arc<[PhysicalSortExpr]>, /// Filter matching the state of the `TopK` heap used for dynamic filter pushdown - filter: Option>, + filter: Arc>, /// If true, indicates that all rows of subsequent batches are guaranteed /// to be greater (by byte order, after row conversion) than the top K, /// which means the top K won't change and the computation can be finished early. pub(crate) finished: bool, } +#[derive(Debug, Clone)] +pub struct TopKDynamicFilters { + /// The current *global* threshold for the dynamic filter. + /// This is shared across all partitions and is updated by any of them. + /// Stored as row bytes for efficient comparison. + threshold_row: Option>, + /// The expression used to evaluate the dynamic filter + /// Only updated when lock held for the duration of the update + expr: Arc, +} + +impl TopKDynamicFilters { + /// Create a new `TopKDynamicFilters` with the given expression + pub fn new(expr: Arc) -> Self { + Self { + threshold_row: None, + expr, + } + } + + pub fn expr(&self) -> Arc { + Arc::clone(&self.expr) + } +} + // Guesstimate for memory allocation: estimated number of bytes used per row in the RowConverter const ESTIMATED_BYTES_PER_ROW: usize = 20; @@ -160,7 +186,7 @@ impl TopK { batch_size: usize, runtime: Arc, metrics: &ExecutionPlanMetricsSet, - filter: Option>, + filter: Arc>, ) -> Result { let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]")) .register(&runtime.memory_pool); @@ -214,41 +240,39 @@ impl TopK { let mut selected_rows = None; - if let Some(filter) = self.filter.as_ref() { - // If a filter is provided, update it with the new rows - let filter = filter.current()?; - let filtered = filter.evaluate(&batch)?; - let num_rows = batch.num_rows(); - let array = filtered.into_array(num_rows)?; - let mut filter = array.as_boolean().clone(); - let true_count = filter.true_count(); - if true_count == 0 { - // nothing to filter, so no need to update - return Ok(()); + // If a filter is provided, update it with the new rows + let filter = self.filter.read().expr.current()?; + let filtered = filter.evaluate(&batch)?; + let num_rows = batch.num_rows(); + let array = filtered.into_array(num_rows)?; + let mut filter = array.as_boolean().clone(); + let true_count = filter.true_count(); + if true_count == 0 { + // nothing to filter, so no need to update + return Ok(()); + } + // only update the keys / rows if the filter does not match all rows + if true_count < num_rows { + // Indices in `set_indices` should be correct if filter contains nulls + // So we prepare the filter here. Note this is also done in the `FilterBuilder` + // so there is no overhead to do this here. + if filter.nulls().is_some() { + filter = prep_null_mask_filter(&filter); } - // only update the keys / rows if the filter does not match all rows - if true_count < num_rows { - // Indices in `set_indices` should be correct if filter contains nulls - // So we prepare the filter here. Note this is also done in the `FilterBuilder` - // so there is no overhead to do this here. - if filter.nulls().is_some() { - filter = prep_null_mask_filter(&filter); - } - let filter_predicate = FilterBuilder::new(&filter); - let filter_predicate = if sort_keys.len() > 1 { - // Optimize filter when it has multiple sort keys - filter_predicate.optimize().build() - } else { - filter_predicate.build() - }; - selected_rows = Some(filter); - sort_keys = sort_keys - .iter() - .map(|key| filter_predicate.filter(key).map_err(|x| x.into())) - .collect::>>()?; - } - }; + let filter_predicate = FilterBuilder::new(&filter); + let filter_predicate = if sort_keys.len() > 1 { + // Optimize filter when it has multiple sort keys + filter_predicate.optimize().build() + } else { + filter_predicate.build() + }; + selected_rows = Some(filter); + sort_keys = sort_keys + .iter() + .map(|key| filter_predicate.filter(key).map_err(|x| x.into())) + .collect::>>()?; + } // reuse existing `Rows` to avoid reallocations let rows = &mut self.scratch_rows; rows.clear(); @@ -319,19 +343,89 @@ impl TopK { /// (a > 2 OR (a = 2 AND b < 3)) /// ``` fn update_filter(&mut self) -> Result<()> { - let Some(filter) = &self.filter else { + // If the heap doesn't have k elements yet, we can't create thresholds + let Some(max_row) = self.heap.max() else { return Ok(()); }; - let Some(thresholds) = self.heap.get_threshold_values(&self.expr)? else { + + let new_threshold_row = &max_row.row; + + // Fast path: check if the current value in topk is better than what is + // currently set in the filter with a read only lock + let needs_update = self + .filter + .read() + .threshold_row + .as_ref() + .map(|current_row| { + // new < current means new threshold is more selective + new_threshold_row < current_row + }) + .unwrap_or(true); // No current threshold, so we need to set one + + // exit early if the current values are better + if !needs_update { return Ok(()); + } + + // Extract scalar values BEFORE acquiring lock to reduce critical section + let thresholds = match self.heap.get_threshold_values(&self.expr)? { + Some(t) => t, + None => return Ok(()), + }; + + // Build the filter expression OUTSIDE any synchronization + let predicate = Self::build_filter_expression(&self.expr, thresholds)?; + let new_threshold = new_threshold_row.to_vec(); + + // update the threshold. Since there was a lock gap, we must check if it is still the best + // may have changed while we were building the expression without the lock + let mut filter = self.filter.write(); + let old_threshold = filter.threshold_row.take(); + + // Update filter if we successfully updated the threshold + // (or if there was no previous threshold and we're the first) + match old_threshold { + Some(old_threshold) => { + // new threshold is still better than the old one + if new_threshold.as_slice() < old_threshold.as_slice() { + filter.threshold_row = Some(new_threshold); + } else { + // some other thread updated the threshold to a better + // one while we were building so there is no need to + // update the filter + filter.threshold_row = Some(old_threshold); + return Ok(()); + } + } + None => { + // No previous threshold, so we can set the new one + filter.threshold_row = Some(new_threshold); + } }; + // Update the filter expression + if let Some(pred) = predicate { + if !pred.eq(&lit(true)) { + filter.expr.update(pred)?; + } + } + + Ok(()) + } + + /// Build the filter expression with the given thresholds. + /// This is now called outside of any locks to reduce critical section time. + fn build_filter_expression( + sort_exprs: &[PhysicalSortExpr], + thresholds: Vec, + ) -> Result>> { // Create filter expressions for each threshold let mut filters: Vec> = Vec::with_capacity(thresholds.len()); let mut prev_sort_expr: Option> = None; - for (sort_expr, value) in self.expr.iter().zip(thresholds.iter()) { + for (sort_expr, value) in sort_exprs.iter().zip(thresholds.iter()) { // Create the appropriate operator based on sort order let op = if sort_expr.options.descending { // For descending sort, we want col > threshold (exclude smaller values) @@ -403,13 +497,7 @@ impl TopK { .into_iter() .reduce(|a, b| Arc::new(BinaryExpr::new(a, Operator::Or, b))); - if let Some(predicate) = dynamic_predicate { - if !predicate.eq(&lit(true)) { - filter.update(predicate)?; - } - } - - Ok(()) + Ok(dynamic_predicate) } /// If input ordering shares a common sort prefix with the TopK, and if the TopK's heap is full, @@ -1053,7 +1141,9 @@ mod tests { 2, runtime, &metrics, - None, + Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new( + DynamicFilterPhysicalExpr::new(vec![], lit(true)), + )))), )?; // Create the first batch with two columns: