Skip to content

Commit 18c5a6c

Browse files
authored
Only update TopK dynamic filters if the new ones are more selective (#16433)
1 parent 3f96c3f commit 18c5a6c

File tree

6 files changed

+232
-104
lines changed

6 files changed

+232
-104
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/datasource-parquet/src/opener.rs

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -409,12 +409,58 @@ impl FileOpener for ParquetOpener {
409409
.with_row_groups(row_group_indexes)
410410
.build()?;
411411

412-
let adapted = stream
413-
.map_err(|e| ArrowError::ExternalError(Box::new(e)))
414-
.map(move |maybe_batch| {
415-
maybe_batch
416-
.and_then(|b| schema_mapping.map_batch(b).map_err(Into::into))
417-
});
412+
// Create a stateful stream that can check pruning after each batch
413+
let adapted = {
414+
use futures::stream;
415+
let schema_mapping = Some(schema_mapping);
416+
let file_pruner = file_pruner;
417+
let stream = stream.map_err(|e| ArrowError::ExternalError(Box::new(e)));
418+
let files_ranges_pruned_statistics =
419+
file_metrics.files_ranges_pruned_statistics.clone();
420+
421+
stream::try_unfold(
422+
(
423+
stream,
424+
schema_mapping,
425+
file_pruner,
426+
files_ranges_pruned_statistics,
427+
),
428+
move |(
429+
mut stream,
430+
schema_mapping_opt,
431+
mut file_pruner,
432+
files_ranges_pruned_statistics,
433+
)| async move {
434+
match stream.try_next().await? {
435+
Some(batch) => {
436+
let schema_mapping = schema_mapping_opt.as_ref().unwrap();
437+
let mapped_batch = schema_mapping.map_batch(batch)?;
438+
439+
// Check if we can prune the file now
440+
if let Some(ref mut pruner) = file_pruner {
441+
if pruner.should_prune()? {
442+
// File can now be pruned based on updated dynamic filters
443+
files_ranges_pruned_statistics.add(1);
444+
// Terminate the stream early
445+
return Ok(None);
446+
}
447+
}
448+
449+
Ok(Some((
450+
mapped_batch,
451+
(
452+
stream,
453+
schema_mapping_opt,
454+
file_pruner,
455+
files_ranges_pruned_statistics,
456+
),
457+
)))
458+
}
459+
None => Ok(None),
460+
}
461+
},
462+
)
463+
};
418464

419465
Ok(adapted.boxed())
420466
}))

datafusion/physical-expr/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ hashbrown = { workspace = true }
5050
indexmap = { workspace = true }
5151
itertools = { workspace = true, features = ["use_std"] }
5252
log = { workspace = true }
53+
parking_lot = { workspace = true }
5354
paste = "^1.0"
5455
petgraph = "0.8.2"
5556

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 20 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::{
19-
any::Any,
20-
fmt::Display,
21-
hash::Hash,
22-
sync::{Arc, RwLock},
23-
};
18+
use parking_lot::RwLock;
19+
use std::{any::Any, fmt::Display, hash::Hash, sync::Arc};
2420

2521
use crate::PhysicalExpr;
2622
use arrow::datatypes::{DataType, Schema};
@@ -72,6 +68,11 @@ impl Inner {
7268
expr,
7369
}
7470
}
71+
72+
/// Clone the inner expression.
73+
fn expr(&self) -> &Arc<dyn PhysicalExpr> {
74+
&self.expr
75+
}
7576
}
7677

7778
impl Hash for DynamicFilterPhysicalExpr {
@@ -176,20 +177,8 @@ impl DynamicFilterPhysicalExpr {
176177
/// This will return the current expression with any children
177178
/// remapped to match calls to [`PhysicalExpr::with_new_children`].
178179
pub fn current(&self) -> Result<Arc<dyn PhysicalExpr>> {
179-
let inner = Arc::clone(
180-
&self
181-
.inner
182-
.read()
183-
.map_err(|_| {
184-
datafusion_common::DataFusionError::Execution(
185-
"Failed to acquire read lock for inner".to_string(),
186-
)
187-
})?
188-
.expr,
189-
);
190-
let inner =
191-
Self::remap_children(&self.children, self.remapped_children.as_ref(), inner)?;
192-
Ok(inner)
180+
let expr = Arc::clone(self.inner.read().expr());
181+
Self::remap_children(&self.children, self.remapped_children.as_ref(), expr)
193182
}
194183

195184
/// Update the current expression.
@@ -199,11 +188,6 @@ impl DynamicFilterPhysicalExpr {
199188
/// - When we've computed the probe side's hash table in a HashJoinExec
200189
/// - After every batch is processed if we update the TopK heap in a SortExec using a TopK approach.
201190
pub fn update(&self, new_expr: Arc<dyn PhysicalExpr>) -> Result<()> {
202-
let mut current = self.inner.write().map_err(|_| {
203-
datafusion_common::DataFusionError::Execution(
204-
"Failed to acquire write lock for inner".to_string(),
205-
)
206-
})?;
207191
// Remap the children of the new expression to match the original children
208192
// We still do this again in `current()` but doing it preventively here
209193
// reduces the work needed in some cases if `current()` is called multiple times
@@ -213,10 +197,13 @@ impl DynamicFilterPhysicalExpr {
213197
self.remapped_children.as_ref(),
214198
new_expr,
215199
)?;
216-
// Update the inner expression to the new expression.
217-
current.expr = new_expr;
218-
// Increment the generation to indicate that the expression has changed.
219-
current.generation += 1;
200+
201+
// Load the current inner, increment generation, and store the new one
202+
let mut current = self.inner.write();
203+
*current = Inner {
204+
generation: current.generation + 1,
205+
expr: new_expr,
206+
};
220207
Ok(())
221208
}
222209
}
@@ -253,10 +240,8 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
253240
{
254241
use datafusion_common::internal_err;
255242
// Check if the data type has changed.
256-
let mut data_type_lock = self
257-
.data_type
258-
.write()
259-
.expect("Failed to acquire write lock for data_type");
243+
let mut data_type_lock = self.data_type.write();
244+
260245
if let Some(existing) = &*data_type_lock {
261246
if existing != &res {
262247
// If the data type has changed, we have a bug.
@@ -278,10 +263,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
278263
{
279264
use datafusion_common::internal_err;
280265
// Check if the nullability has changed.
281-
let mut nullable_lock = self
282-
.nullable
283-
.write()
284-
.expect("Failed to acquire write lock for nullable");
266+
let mut nullable_lock = self.nullable.write();
285267
if let Some(existing) = *nullable_lock {
286268
if existing != res {
287269
// If the nullability has changed, we have a bug.
@@ -324,10 +306,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
324306

325307
fn snapshot_generation(&self) -> u64 {
326308
// Return the current generation of the expression.
327-
self.inner
328-
.read()
329-
.expect("Failed to acquire read lock for inner")
330-
.generation
309+
self.inner.read().generation
331310
}
332311
}
333312

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ use std::fmt;
2424
use std::fmt::{Debug, Formatter};
2525
use std::sync::Arc;
2626

27+
use parking_lot::RwLock;
28+
2729
use crate::common::spawn_buffered;
2830
use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
2931
use crate::expressions::PhysicalSortExpr;
@@ -39,8 +41,10 @@ use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
3941
use crate::spill::get_record_batch_memory_size;
4042
use crate::spill::in_progress_spill_file::InProgressSpillFile;
4143
use crate::spill::spill_manager::{GetSlicedSize, SpillManager};
42-
use crate::stream::{BatchSplitStream, RecordBatchStreamAdapter};
44+
use crate::stream::BatchSplitStream;
45+
use crate::stream::RecordBatchStreamAdapter;
4346
use crate::topk::TopK;
47+
use crate::topk::TopKDynamicFilters;
4448
use crate::{
4549
DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan,
4650
ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream,
@@ -51,7 +55,10 @@ use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
5155
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
5256
use arrow::datatypes::SchemaRef;
5357
use datafusion_common::config::SpillCompression;
54-
use datafusion_common::{internal_datafusion_err, internal_err, DataFusionError, Result};
58+
use datafusion_common::{
59+
internal_datafusion_err, internal_err, unwrap_or_internal_err, DataFusionError,
60+
Result,
61+
};
5562
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
5663
use datafusion_execution::runtime_env::RuntimeEnv;
5764
use datafusion_execution::TaskContext;
@@ -887,8 +894,10 @@ pub struct SortExec {
887894
common_sort_prefix: Vec<PhysicalSortExpr>,
888895
/// Cache holding plan properties like equivalences, output partitioning etc.
889896
cache: PlanProperties,
890-
/// Filter matching the state of the sort for dynamic filter pushdown
891-
filter: Option<Arc<DynamicFilterPhysicalExpr>>,
897+
/// Filter matching the state of the sort for dynamic filter pushdown.
898+
/// If `fetch` is `Some`, this will also be set and a TopK operator may be used.
899+
/// If `fetch` is `None`, this will be `None`.
900+
filter: Option<Arc<RwLock<TopKDynamicFilters>>>,
892901
}
893902

894903
impl SortExec {
@@ -934,14 +943,16 @@ impl SortExec {
934943
self
935944
}
936945

937-
/// Add or reset `self.filter` to a new `DynamicFilterPhysicalExpr`.
938-
fn create_filter(&self) -> Arc<DynamicFilterPhysicalExpr> {
946+
/// Add or reset `self.filter` to a new `TopKDynamicFilters`.
947+
fn create_filter(&self) -> Arc<RwLock<TopKDynamicFilters>> {
939948
let children = self
940949
.expr
941950
.iter()
942951
.map(|sort_expr| Arc::clone(&sort_expr.expr))
943952
.collect::<Vec<_>>();
944-
Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true)))
953+
Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
954+
DynamicFilterPhysicalExpr::new(children, lit(true)),
955+
))))
945956
}
946957

947958
fn cloned(&self) -> Self {
@@ -1080,7 +1091,7 @@ impl DisplayAs for SortExec {
10801091
Some(fetch) => {
10811092
write!(f, "SortExec: TopK(fetch={fetch}), expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr)?;
10821093
if let Some(filter) = &self.filter {
1083-
if let Ok(current) = filter.current() {
1094+
if let Ok(current) = filter.read().expr().current() {
10841095
if !current.eq(&lit(true)) {
10851096
write!(f, ", filter=[{current}]")?;
10861097
}
@@ -1216,6 +1227,7 @@ impl ExecutionPlan for SortExec {
12161227
))),
12171228
(true, None) => Ok(input),
12181229
(false, Some(fetch)) => {
1230+
let filter = self.filter.clone();
12191231
let mut topk = TopK::try_new(
12201232
partition,
12211233
input.schema(),
@@ -1225,7 +1237,7 @@ impl ExecutionPlan for SortExec {
12251237
context.session_config().batch_size(),
12261238
context.runtime_env(),
12271239
&self.metrics_set,
1228-
self.filter.clone(),
1240+
Arc::clone(&unwrap_or_internal_err!(filter)),
12291241
)?;
12301242
Ok(Box::pin(RecordBatchStreamAdapter::new(
12311243
self.schema(),
@@ -1349,8 +1361,7 @@ impl ExecutionPlan for SortExec {
13491361

13501362
if let Some(filter) = &self.filter {
13511363
if config.optimizer.enable_dynamic_filter_pushdown {
1352-
child =
1353-
child.with_self_filter(Arc::clone(filter) as Arc<dyn PhysicalExpr>);
1364+
child = child.with_self_filter(filter.read().expr());
13541365
}
13551366
}
13561367

0 commit comments

Comments
 (0)