Skip to content

Commit d2a091b

Browse files
committed
Single lock
1 parent cb84f23 commit d2a091b

File tree

2 files changed

+25
-25
lines changed

2 files changed

+25
-25
lines changed

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ use std::fmt;
2424
use std::fmt::{Debug, Formatter};
2525
use std::sync::Arc;
2626

27+
use parking_lot::RwLock;
28+
2729
use crate::common::spawn_buffered;
2830
use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
2931
use crate::expressions::PhysicalSortExpr;
@@ -895,7 +897,7 @@ pub struct SortExec {
895897
/// Filter matching the state of the sort for dynamic filter pushdown.
896898
/// If `fetch` is `Some`, this will also be set and a TopK operator may be used.
897899
/// If `fetch` is `None`, this will be `None`.
898-
filter: Option<TopKDynamicFilters>,
900+
filter: Option<Arc<RwLock<TopKDynamicFilters>>>,
899901
}
900902

901903
impl SortExec {
@@ -942,16 +944,15 @@ impl SortExec {
942944
}
943945

944946
/// Add or reset `self.filter` to a new `TopKDynamicFilters`.
945-
fn create_filter(&self) -> TopKDynamicFilters {
947+
fn create_filter(&self) -> Arc<RwLock<TopKDynamicFilters>> {
946948
let children = self
947949
.expr
948950
.iter()
949951
.map(|sort_expr| Arc::clone(&sort_expr.expr))
950952
.collect::<Vec<_>>();
951-
TopKDynamicFilters::new(Arc::new(DynamicFilterPhysicalExpr::new(
952-
children,
953-
lit(true),
954-
)))
953+
Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
954+
DynamicFilterPhysicalExpr::new(children, lit(true)),
955+
))))
955956
}
956957

957958
fn cloned(&self) -> Self {
@@ -1090,7 +1091,7 @@ impl DisplayAs for SortExec {
10901091
Some(fetch) => {
10911092
write!(f, "SortExec: TopK(fetch={fetch}), expr=[{}], preserve_partitioning=[{preserve_partitioning}]", self.expr)?;
10921093
if let Some(filter) = &self.filter {
1093-
if let Ok(current) = filter.expr().current() {
1094+
if let Ok(current) = filter.read().expr().current() {
10941095
if !current.eq(&lit(true)) {
10951096
write!(f, ", filter=[{current}]")?;
10961097
}
@@ -1236,7 +1237,7 @@ impl ExecutionPlan for SortExec {
12361237
context.session_config().batch_size(),
12371238
context.runtime_env(),
12381239
&self.metrics_set,
1239-
unwrap_or_internal_err!(filter),
1240+
Arc::clone(&unwrap_or_internal_err!(filter)),
12401241
)?;
12411242
Ok(Box::pin(RecordBatchStreamAdapter::new(
12421243
self.schema(),
@@ -1360,7 +1361,7 @@ impl ExecutionPlan for SortExec {
13601361

13611362
if let Some(filter) = &self.filter {
13621363
if config.optimizer.enable_dynamic_filter_pushdown {
1363-
child = child.with_self_filter(filter.expr());
1364+
child = child.with_self_filter(filter.read().expr());
13641365
}
13651366
}
13661367

datafusion/physical-plan/src/topk/mod.rs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ pub struct TopK {
122122
/// Common sort prefix between the input and the sort expressions to allow early exit optimization
123123
common_sort_prefix: Arc<[PhysicalSortExpr]>,
124124
/// Filter matching the state of the `TopK` heap used for dynamic filter pushdown
125-
filter: TopKDynamicFilters,
125+
filter: Arc<RwLock<TopKDynamicFilters>>,
126126
/// If true, indicates that all rows of subsequent batches are guaranteed
127127
/// to be greater (by byte order, after row conversion) than the top K,
128128
/// which means the top K won't change and the computation can be finished early.
@@ -134,7 +134,7 @@ pub struct TopKDynamicFilters {
134134
/// The current *global* threshold for the dynamic filter.
135135
/// This is shared across all partitions and is updated by any of them.
136136
/// Stored as row bytes for efficient comparison.
137-
threshold_row: Arc<RwLock<Option<Vec<u8>>>>,
137+
threshold_row: Option<Vec<u8>>,
138138
/// The expression used to evaluate the dynamic filter
139139
/// Only updated when lock held for the duration of the update
140140
expr: Arc<DynamicFilterPhysicalExpr>,
@@ -144,7 +144,7 @@ impl TopKDynamicFilters {
144144
/// Create a new `TopKDynamicFilters` with the given expression
145145
pub fn new(expr: Arc<DynamicFilterPhysicalExpr>) -> Self {
146146
Self {
147-
threshold_row: Arc::new(RwLock::new(None)),
147+
threshold_row: None,
148148
expr,
149149
}
150150
}
@@ -186,7 +186,7 @@ impl TopK {
186186
batch_size: usize,
187187
runtime: Arc<RuntimeEnv>,
188188
metrics: &ExecutionPlanMetricsSet,
189-
filter: TopKDynamicFilters,
189+
filter: Arc<RwLock<TopKDynamicFilters>>,
190190
) -> Result<Self> {
191191
let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
192192
.register(&runtime.memory_pool);
@@ -241,7 +241,7 @@ impl TopK {
241241
let mut selected_rows = None;
242242

243243
// If a filter is provided, update it with the new rows
244-
let filter = self.filter.expr.current()?;
244+
let filter = self.filter.read().expr.current()?;
245245
let filtered = filter.evaluate(&batch)?;
246246
let num_rows = batch.num_rows();
247247
let array = filtered.into_array(num_rows)?;
@@ -360,8 +360,8 @@ impl TopK {
360360
// currently set in the filter with a read only lock
361361
let needs_update = self
362362
.filter
363-
.threshold_row
364363
.read()
364+
.threshold_row
365365
.as_ref()
366366
.map(|current_row| {
367367
// new < current means new threshold is more selective
@@ -380,34 +380,34 @@ impl TopK {
380380

381381
// update the threshold. Since there was a lock gap, we must check if it is still the best
382382
// may have changed while we were building the expression without the lock
383-
let mut current_threshold = self.filter.threshold_row.write();
384-
let old_threshold = current_threshold.take();
383+
let mut filter = self.filter.write();
384+
let old_threshold = filter.threshold_row.take();
385385

386386
// Update filter if we successfully updated the threshold
387387
// (or if there was no previous threshold and we're the first)
388388
match old_threshold {
389389
Some(old_threshold) => {
390390
// new threshold is still better than the old one
391391
if new_threshold.as_slice() < old_threshold.as_slice() {
392-
*current_threshold = Some(new_threshold);
392+
filter.threshold_row = Some(new_threshold);
393393
} else {
394394
// some other thread updated the threshold to a better
395395
// one while we were building so there is no need to
396396
// update the filter
397-
*current_threshold = Some(old_threshold);
397+
filter.threshold_row = Some(old_threshold);
398398
return Ok(());
399399
}
400400
}
401401
None => {
402402
// No previous threshold, so we can set the new one
403-
*current_threshold = Some(new_threshold);
403+
filter.threshold_row = Some(new_threshold);
404404
}
405405
};
406406

407407
// Update the filter expression
408408
if let Some(pred) = predicate {
409409
if !pred.eq(&lit(true)) {
410-
self.filter.expr.update(pred)?;
410+
filter.expr.update(pred)?;
411411
}
412412
}
413413

@@ -1141,10 +1141,9 @@ mod tests {
11411141
2,
11421142
runtime,
11431143
&metrics,
1144-
TopKDynamicFilters::new(Arc::new(DynamicFilterPhysicalExpr::new(
1145-
vec![],
1146-
lit(true),
1147-
))),
1144+
Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
1145+
DynamicFilterPhysicalExpr::new(vec![], lit(true)),
1146+
)))),
11481147
)?;
11491148

11501149
// Create the first batch with two columns:

0 commit comments

Comments
 (0)