Skip to content

Commit f26b319

Browse files
adriangbRobert Ream
andcommitted
Add ExecutionPlan::reset_state
Co-authored-by: Robert Ream <[email protected]>
1 parent d376a32 commit f26b319

File tree

4 files changed

+120
-26
lines changed

4 files changed

+120
-26
lines changed

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,31 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
194194
children: Vec<Arc<dyn ExecutionPlan>>,
195195
) -> Result<Arc<dyn ExecutionPlan>>;
196196

197+
/// Reset any internal state within this [`ExecutionPlan`].
198+
///
199+
/// This method is called when an [`ExecutionPlan`] needs to be re-executed,
200+
/// such as in recursive queries. Unlike [`ExecutionPlan::with_new_children`], this method
201+
/// ensures that any stateful components (e.g., [`DynamicFilterPhysicalExpr`])
202+
/// are reset to their initial state.
203+
///
204+
/// The default implementation simply calls [`ExecutionPlan::with_new_children`] with the existing children,
205+
/// effectively creating a new instance of the [`ExecutionPlan`] with the same children but without
206+
/// necessarily resetting any internal state. Implementations that require resetting of some
207+
/// internal state should override this method to provide the necessary logic.
208+
///
209+
/// This method should *not* reset state recursively for children, as it is expected that
210+
/// it will be called from within a walk of the execution plan tree so that it will be called on each child later
211+
/// or was already called on each child.
212+
///
213+
/// Note to implementers: unlike [`ExecutionPlan::with_new_children`] this method does not accept new children as an argument,
214+
/// thus it is expected that any cached plan properties will remain valid after the reset.
215+
///
216+
/// [`DynamicFilterPhysicalExpr`]: datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr
217+
fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
218+
let children = self.children().into_iter().cloned().collect();
219+
self.with_new_children(children)
220+
}
221+
197222
/// If supported, attempt to increase the partitioning of this `ExecutionPlan` to
198223
/// produce `target_partitions` partitions.
199224
///

datafusion/physical-plan/src/recursive_query.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ fn assign_work_table(
372372
}
373373

374374
/// Some plans will change their internal states after execution, making them unable to be executed again.
375-
/// This function uses `ExecutionPlan::with_new_children` to fork a new plan with initial states.
375+
/// This function uses [`ExecutionPlan::reset_state`] to reset any internal state within the plan.
376376
///
377377
/// An example is `CrossJoinExec`, which loads the left table into memory and stores it in the plan.
378378
/// However, if the data of the left table is derived from the work table, it will become outdated
@@ -383,8 +383,7 @@ fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPl
383383
if plan.as_any().is::<WorkTableExec>() {
384384
Ok(Transformed::no(plan))
385385
} else {
386-
let new_plan = Arc::clone(&plan)
387-
.with_new_children(plan.children().into_iter().cloned().collect())?;
386+
let new_plan = Arc::clone(&plan).reset_state()?;
388387
Ok(Transformed::yes(new_plan))
389388
}
390389
})

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 69 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -905,6 +905,29 @@ impl SortExec {
905905
self
906906
}
907907

908+
/// Add or reset `self.filter` to a new `DynamicFilterPhysicalExpr`.
909+
fn create_filter(&self) -> Arc<DynamicFilterPhysicalExpr> {
910+
let children = self
911+
.expr
912+
.iter()
913+
.map(|sort_expr| Arc::clone(&sort_expr.expr))
914+
.collect::<Vec<_>>();
915+
Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true)))
916+
}
917+
918+
fn cloned(&self) -> Self {
919+
SortExec {
920+
input: Arc::clone(&self.input),
921+
expr: self.expr.clone(),
922+
metrics_set: self.metrics_set.clone(),
923+
preserve_partitioning: self.preserve_partitioning,
924+
common_sort_prefix: self.common_sort_prefix.clone(),
925+
fetch: self.fetch,
926+
cache: self.cache.clone(),
927+
filter: self.filter.clone(),
928+
}
929+
}
930+
908931
/// Modify how many rows to include in the result
909932
///
910933
/// If None, then all rows will be returned, in sorted order.
@@ -926,25 +949,13 @@ impl SortExec {
926949
}
927950
let filter = fetch.is_some().then(|| {
928951
// If we already have a filter, keep it. Otherwise, create a new one.
929-
self.filter.clone().unwrap_or_else(|| {
930-
let children = self
931-
.expr
932-
.iter()
933-
.map(|sort_expr| Arc::clone(&sort_expr.expr))
934-
.collect::<Vec<_>>();
935-
Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true)))
936-
})
952+
self.filter.clone().unwrap_or_else(|| self.create_filter())
937953
});
938-
SortExec {
939-
input: Arc::clone(&self.input),
940-
expr: self.expr.clone(),
941-
metrics_set: self.metrics_set.clone(),
942-
preserve_partitioning: self.preserve_partitioning,
943-
common_sort_prefix: self.common_sort_prefix.clone(),
944-
fetch,
945-
cache,
946-
filter,
947-
}
954+
let mut new_sort = self.cloned();
955+
new_sort.fetch = fetch;
956+
new_sort.cache = cache;
957+
new_sort.filter = filter;
958+
new_sort
948959
}
949960

950961
/// Input schema
@@ -1116,10 +1127,46 @@ impl ExecutionPlan for SortExec {
11161127
self: Arc<Self>,
11171128
children: Vec<Arc<dyn ExecutionPlan>>,
11181129
) -> Result<Arc<dyn ExecutionPlan>> {
1119-
let mut new_sort = SortExec::new(self.expr.clone(), Arc::clone(&children[0]))
1120-
.with_fetch(self.fetch)
1121-
.with_preserve_partitioning(self.preserve_partitioning);
1122-
new_sort.filter = self.filter.clone();
1130+
let mut new_sort = self.cloned();
1131+
assert!(
1132+
children.len() == 1,
1133+
"SortExec should have exactly one child"
1134+
);
1135+
new_sort.input = Arc::clone(&children[0]);
1136+
// Recompute the properties based on the new input since they may have changed.
1137+
let (cache, sort_prefix) = Self::compute_properties(
1138+
&new_sort.input,
1139+
new_sort.expr.clone(),
1140+
new_sort.preserve_partitioning,
1141+
)
1142+
.expect(concat!(
1143+
"Safety: we had already been calling `compute_properties(...).unwrap()` in `new()` ",
1144+
"and it seems to be okay",
1145+
"\n",
1146+
"We assumed that doing the same thing here directly instead ",
1147+
"of calling `new()` (as we did before this commit) is also okay but it's possible that ",
1148+
"implementations have drifted and this is no longer safe even if `new()` still works, ",
1149+
"for example if `new()` now does something different than just calling `compute_properties(...).unwrap()`",
1150+
"\n",
1151+
"This is clearly a bug, please report it!"
1152+
));
1153+
new_sort.cache = cache;
1154+
new_sort.common_sort_prefix = sort_prefix;
1155+
1156+
Ok(Arc::new(new_sort))
1157+
}
1158+
1159+
fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
1160+
let children = self.children().into_iter().cloned().collect();
1161+
let new_sort = self.with_new_children(children)?;
1162+
let mut new_sort = new_sort
1163+
.as_any()
1164+
.downcast_ref::<SortExec>()
1165+
.expect("cloned 1 lines above this line, we know the type")
1166+
.clone();
1167+
// Our dynamic filter and execution metrics are the state we need to reset.
1168+
new_sort.filter = Some(new_sort.create_filter());
1169+
new_sort.metrics_set = ExecutionPlanMetricsSet::new();
11231170

11241171
Ok(Arc::new(new_sort))
11251172
}

datafusion/sqllogictest/test_files/cte.slt

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -996,6 +996,29 @@ physical_plan
996996
08)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
997997
09)------------WorkTableExec: name=numbers
998998

999+
# Test for issue #16998: SortExec shares DynamicFilterPhysicalExpr across multiple executions
1000+
# This should return 5 rows but currently returns only 2 due to the bug
1001+
query II
1002+
with recursive r as (
1003+
select 0 as k, 0 as v
1004+
union all
1005+
(
1006+
select *
1007+
from r
1008+
order by v
1009+
limit 1
1010+
)
1011+
)
1012+
select *
1013+
from r
1014+
limit 5;
1015+
----
1016+
0 0
1017+
0 0
1018+
0 0
1019+
0 0
1020+
0 0
1021+
9991022
statement count 0
10001023
set datafusion.execution.enable_recursive_ctes = false;
10011024

@@ -1004,4 +1027,4 @@ explain WITH RECURSIVE numbers AS (
10041027
select 1 as n
10051028
UNION ALL
10061029
select n + 1 FROM numbers WHERE N < 10
1007-
) select * from numbers;
1030+
) select * from numbers;

0 commit comments

Comments
 (0)