Skip to content

Commit f9efba0

Browse files
adriangbRobert Ream
andauthored
Add ExecutionPlan::reset_state (#17028)
* Add ExecutionPlan::reset_state Co-authored-by: Robert Ream <[email protected]> * Update datafusion/sqllogictest/test_files/cte.slt * Add reference * fmt * add to upgrade guide * add explain plan, implement in more plans * fmt * only explain --------- Co-authored-by: Robert Ream <[email protected]>
1 parent 7bc9906 commit f9efba0

File tree

8 files changed

+190
-25
lines changed

8 files changed

+190
-25
lines changed

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ use datafusion_expr::ColumnarValue;
3232
use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash};
3333

3434
/// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference to it.
35+
///
36+
/// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also
37+
/// implement `ExecutionPlan::reset_state` to remain compatible with recursive queries and other situations where
38+
/// the same `ExecutionPlan` is reused with different data.
3539
#[derive(Debug)]
3640
pub struct DynamicFilterPhysicalExpr {
3741
/// The original children of this PhysicalExpr, if any.
@@ -121,6 +125,10 @@ impl DynamicFilterPhysicalExpr {
121125
/// do not change* since those will be used to determine what columns need to read or projected
122126
/// when evaluating the expression.
123127
///
128+
/// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also
129+
/// implement `ExecutionPlan::reset_state` to remain compatible with recursive queries and other situations where
130+
/// the same `ExecutionPlan` is reused with different data.
131+
///
124132
/// [`collect_columns`]: crate::utils::collect_columns
125133
pub fn new(
126134
children: Vec<Arc<dyn PhysicalExpr>>,

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,31 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
195195
children: Vec<Arc<dyn ExecutionPlan>>,
196196
) -> Result<Arc<dyn ExecutionPlan>>;
197197

198+
/// Reset any internal state within this [`ExecutionPlan`].
199+
///
200+
/// This method is called when an [`ExecutionPlan`] needs to be re-executed,
201+
/// such as in recursive queries. Unlike [`ExecutionPlan::with_new_children`], this method
202+
/// ensures that any stateful components (e.g., [`DynamicFilterPhysicalExpr`])
203+
/// are reset to their initial state.
204+
///
205+
/// The default implementation simply calls [`ExecutionPlan::with_new_children`] with the existing children,
206+
/// effectively creating a new instance of the [`ExecutionPlan`] with the same children but without
207+
/// necessarily resetting any internal state. Implementations that require resetting of some
208+
/// internal state should override this method to provide the necessary logic.
209+
///
210+
/// This method should *not* reset state recursively for children, as it is expected that
211+
/// it will be called from within a walk of the execution plan tree so that it will be called on each child later
212+
/// or was already called on each child.
213+
///
214+
/// Note to implementers: unlike [`ExecutionPlan::with_new_children`] this method does not accept new children as an argument,
215+
/// thus it is expected that any cached plan properties will remain valid after the reset.
216+
///
217+
/// [`DynamicFilterPhysicalExpr`]: datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr
218+
fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
219+
let children = self.children().into_iter().cloned().collect();
220+
self.with_new_children(children)
221+
}
222+
198223
/// If supported, attempt to increase the partitioning of this `ExecutionPlan` to
199224
/// produce `target_partitions` partitions.
200225
///

datafusion/physical-plan/src/joins/cross_join.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,18 @@ impl ExecutionPlan for CrossJoinExec {
270270
)))
271271
}
272272

273+
fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
274+
let new_exec = CrossJoinExec {
275+
left: Arc::clone(&self.left),
276+
right: Arc::clone(&self.right),
277+
schema: Arc::clone(&self.schema),
278+
left_fut: Default::default(), // reset the build side!
279+
metrics: ExecutionPlanMetricsSet::default(),
280+
cache: self.cache.clone(),
281+
};
282+
Ok(Arc::new(new_exec))
283+
}
284+
273285
fn required_input_distribution(&self) -> Vec<Distribution> {
274286
vec![
275287
Distribution::SinglePartition,

datafusion/physical-plan/src/joins/hash_join.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,26 @@ impl ExecutionPlan for HashJoinExec {
774774
)?))
775775
}
776776

777+
fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
778+
// Reset the left_fut to allow re-execution
779+
Ok(Arc::new(HashJoinExec {
780+
left: Arc::clone(&self.left),
781+
right: Arc::clone(&self.right),
782+
on: self.on.clone(),
783+
filter: self.filter.clone(),
784+
join_type: self.join_type,
785+
join_schema: Arc::clone(&self.join_schema),
786+
left_fut: OnceAsync::default(),
787+
random_state: self.random_state.clone(),
788+
mode: self.mode,
789+
metrics: ExecutionPlanMetricsSet::new(),
790+
projection: self.projection.clone(),
791+
column_indices: self.column_indices.clone(),
792+
null_equality: self.null_equality,
793+
cache: self.cache.clone(),
794+
}))
795+
}
796+
777797
fn execute(
778798
&self,
779799
partition: usize,

datafusion/physical-plan/src/recursive_query.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ fn assign_work_table(
372372
}
373373

374374
/// Some plans will change their internal states after execution, making them unable to be executed again.
375-
/// This function uses `ExecutionPlan::with_new_children` to fork a new plan with initial states.
375+
/// This function uses [`ExecutionPlan::reset_state`] to reset any internal state within the plan.
376376
///
377377
/// An example is `CrossJoinExec`, which loads the left table into memory and stores it in the plan.
378378
/// However, if the data of the left table is derived from the work table, it will become outdated
@@ -383,8 +383,7 @@ fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPl
383383
if plan.as_any().is::<WorkTableExec>() {
384384
Ok(Transformed::no(plan))
385385
} else {
386-
let new_plan = Arc::clone(&plan)
387-
.with_new_children(plan.children().into_iter().cloned().collect())?;
386+
let new_plan = Arc::clone(&plan).reset_state()?;
388387
Ok(Transformed::yes(new_plan))
389388
}
390389
})

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 58 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -905,6 +905,29 @@ impl SortExec {
905905
self
906906
}
907907

908+
/// Add or reset `self.filter` to a new `DynamicFilterPhysicalExpr`.
909+
fn create_filter(&self) -> Arc<DynamicFilterPhysicalExpr> {
910+
let children = self
911+
.expr
912+
.iter()
913+
.map(|sort_expr| Arc::clone(&sort_expr.expr))
914+
.collect::<Vec<_>>();
915+
Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true)))
916+
}
917+
918+
fn cloned(&self) -> Self {
919+
SortExec {
920+
input: Arc::clone(&self.input),
921+
expr: self.expr.clone(),
922+
metrics_set: self.metrics_set.clone(),
923+
preserve_partitioning: self.preserve_partitioning,
924+
common_sort_prefix: self.common_sort_prefix.clone(),
925+
fetch: self.fetch,
926+
cache: self.cache.clone(),
927+
filter: self.filter.clone(),
928+
}
929+
}
930+
908931
/// Modify how many rows to include in the result
909932
///
910933
/// If None, then all rows will be returned, in sorted order.
@@ -926,25 +949,13 @@ impl SortExec {
926949
}
927950
let filter = fetch.is_some().then(|| {
928951
// If we already have a filter, keep it. Otherwise, create a new one.
929-
self.filter.clone().unwrap_or_else(|| {
930-
let children = self
931-
.expr
932-
.iter()
933-
.map(|sort_expr| Arc::clone(&sort_expr.expr))
934-
.collect::<Vec<_>>();
935-
Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true)))
936-
})
952+
self.filter.clone().unwrap_or_else(|| self.create_filter())
937953
});
938-
SortExec {
939-
input: Arc::clone(&self.input),
940-
expr: self.expr.clone(),
941-
metrics_set: self.metrics_set.clone(),
942-
preserve_partitioning: self.preserve_partitioning,
943-
common_sort_prefix: self.common_sort_prefix.clone(),
944-
fetch,
945-
cache,
946-
filter,
947-
}
954+
let mut new_sort = self.cloned();
955+
new_sort.fetch = fetch;
956+
new_sort.cache = cache;
957+
new_sort.filter = filter;
958+
new_sort
948959
}
949960

950961
/// Input schema
@@ -1116,10 +1127,35 @@ impl ExecutionPlan for SortExec {
11161127
self: Arc<Self>,
11171128
children: Vec<Arc<dyn ExecutionPlan>>,
11181129
) -> Result<Arc<dyn ExecutionPlan>> {
1119-
let mut new_sort = SortExec::new(self.expr.clone(), Arc::clone(&children[0]))
1120-
.with_fetch(self.fetch)
1121-
.with_preserve_partitioning(self.preserve_partitioning);
1122-
new_sort.filter = self.filter.clone();
1130+
let mut new_sort = self.cloned();
1131+
assert!(
1132+
children.len() == 1,
1133+
"SortExec should have exactly one child"
1134+
);
1135+
new_sort.input = Arc::clone(&children[0]);
1136+
// Recompute the properties based on the new input since they may have changed
1137+
let (cache, sort_prefix) = Self::compute_properties(
1138+
&new_sort.input,
1139+
new_sort.expr.clone(),
1140+
new_sort.preserve_partitioning,
1141+
)?;
1142+
new_sort.cache = cache;
1143+
new_sort.common_sort_prefix = sort_prefix;
1144+
1145+
Ok(Arc::new(new_sort))
1146+
}
1147+
1148+
fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
1149+
let children = self.children().into_iter().cloned().collect();
1150+
let new_sort = self.with_new_children(children)?;
1151+
let mut new_sort = new_sort
1152+
.as_any()
1153+
.downcast_ref::<SortExec>()
1154+
.expect("cloned 1 lines above this line, we know the type")
1155+
.clone();
1156+
// Our dynamic filter and execution metrics are the state we need to reset.
1157+
new_sort.filter = Some(new_sort.create_filter());
1158+
new_sort.metrics_set = ExecutionPlanMetricsSet::new();
11231159

11241160
Ok(Arc::new(new_sort))
11251161
}

datafusion/sqllogictest/test_files/cte.slt

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -996,6 +996,61 @@ physical_plan
996996
08)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
997997
09)------------WorkTableExec: name=numbers
998998

999+
# Test for issue #16998: SortExec shares DynamicFilterPhysicalExpr across multiple executions
1000+
query II
1001+
with recursive r as (
1002+
select 0 as k, 0 as v
1003+
union all
1004+
(
1005+
select *
1006+
from r
1007+
order by v
1008+
limit 1
1009+
)
1010+
)
1011+
select *
1012+
from r
1013+
limit 5;
1014+
----
1015+
0 0
1016+
0 0
1017+
0 0
1018+
0 0
1019+
0 0
1020+
1021+
query TT
1022+
explain
1023+
with recursive r as (
1024+
select 0 as k, 0 as v
1025+
union all
1026+
(
1027+
select *
1028+
from r
1029+
order by v
1030+
limit 1
1031+
)
1032+
)
1033+
select *
1034+
from r
1035+
limit 5;
1036+
----
1037+
logical_plan
1038+
01)SubqueryAlias: r
1039+
02)--Limit: skip=0, fetch=5
1040+
03)----RecursiveQuery: is_distinct=false
1041+
04)------Projection: Int64(0) AS k, Int64(0) AS v
1042+
05)--------EmptyRelation
1043+
06)------Sort: r.v ASC NULLS LAST, fetch=1
1044+
07)--------Projection: r.k, r.v
1045+
08)----------TableScan: r
1046+
physical_plan
1047+
01)GlobalLimitExec: skip=0, fetch=5
1048+
02)--RecursiveQueryExec: name=r, is_distinct=false
1049+
03)----ProjectionExec: expr=[0 as k, 0 as v]
1050+
04)------PlaceholderRowExec
1051+
05)----SortExec: TopK(fetch=1), expr=[v@1 ASC NULLS LAST], preserve_partitioning=[false]
1052+
06)------WorkTableExec: name=r
1053+
9991054
statement count 0
10001055
set datafusion.execution.enable_recursive_ctes = false;
10011056

docs/source/library-user-guide/upgrading.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,16 @@ This version of DataFusion upgrades the underlying Apache Arrow implementation
142142
to version `56.0.0`. See the [release notes](https://github.com/apache/arrow-rs/releases/tag/56.0.0)
143143
for more details.
144144

145+
### Added `ExecutionPlan::reset_state`
146+
147+
In order to fix a bug in DataFusion `49.0.0` where dynamic filters (currently only generated in the precense of a query such as `ORDER BY ... LIMIT ...`)
148+
produced incorrect results in recursive queries, a new method `reset_state` has been added to the `ExecutionPlan` trait.
149+
150+
Any `ExecutionPlan` that needs to maintain internal state or references to other nodes in the execution plan tree should implement this method to reset that state.
151+
See [#17028] for more details and an example implementation for `SortExec`.
152+
153+
[#17028]: https://github.com/apache/datafusion/pull/17028
154+
145155
## DataFusion `49.0.0`
146156

147157
### `MSRV` updated to 1.85.1

0 commit comments

Comments
 (0)