Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions datafusion/physical-expr/src/expressions/dynamic_filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ use datafusion_expr::ColumnarValue;
use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash};

/// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference to it.
///
/// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also
/// implement `ExecutionPlan::reset_state` to remain compatible with recursive queries and other situations where
/// the same `ExecutionPlan` is reused with different data.
#[derive(Debug)]
pub struct DynamicFilterPhysicalExpr {
/// The original children of this PhysicalExpr, if any.
Expand Down Expand Up @@ -121,6 +125,10 @@ impl DynamicFilterPhysicalExpr {
/// do not change* since those will be used to determine what columns need to read or projected
/// when evaluating the expression.
///
/// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also
/// implement `ExecutionPlan::reset_state` to remain compatible with recursive queries and other situations where
/// the same `ExecutionPlan` is reused with different data.
///
/// [`collect_columns`]: crate::utils::collect_columns
pub fn new(
children: Vec<Arc<dyn PhysicalExpr>>,
Expand Down
25 changes: 25 additions & 0 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,31 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>>;

/// Reset any internal state within this [`ExecutionPlan`].
///
/// This method is called when an [`ExecutionPlan`] needs to be re-executed,
/// such as in recursive queries. Unlike [`ExecutionPlan::with_new_children`], this method
/// ensures that any stateful components (e.g., [`DynamicFilterPhysicalExpr`])
/// are reset to their initial state.
///
/// The default implementation simply calls [`ExecutionPlan::with_new_children`] with the existing children,
/// effectively creating a new instance of the [`ExecutionPlan`] with the same children but without
/// necessarily resetting any internal state. Implementations that require resetting of some
/// internal state should override this method to provide the necessary logic.
///
/// This method should *not* reset state recursively for children, as it is expected that
/// it will be called from within a walk of the execution plan tree so that it will be called on each child later
/// or was already called on each child.
///
/// Note to implementers: unlike [`ExecutionPlan::with_new_children`] this method does not accept new children as an argument,
/// thus it is expected that any cached plan properties will remain valid after the reset.
///
/// [`DynamicFilterPhysicalExpr`]: datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should also add a note to DynamicFilterPhysicalExpr saying any ExecutionPlan that uses them should also implement reset_state

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
let children = self.children().into_iter().cloned().collect();
self.with_new_children(children)
}

/// If supported, attempt to increase the partitioning of this `ExecutionPlan` to
/// produce `target_partitions` partitions.
///
Expand Down
12 changes: 12 additions & 0 deletions datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,18 @@ impl ExecutionPlan for CrossJoinExec {
)))
}

fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
let new_exec = CrossJoinExec {
left: Arc::clone(&self.left),
right: Arc::clone(&self.right),
schema: Arc::clone(&self.schema),
left_fut: Default::default(), // reset the build side!
metrics: ExecutionPlanMetricsSet::default(),
cache: self.cache.clone(),
};
Ok(Arc::new(new_exec))
}

fn required_input_distribution(&self) -> Vec<Distribution> {
vec![
Distribution::SinglePartition,
Expand Down
20 changes: 20 additions & 0 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,26 @@ impl ExecutionPlan for HashJoinExec {
)?))
}

fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
// Reset the left_fut to allow re-execution
Ok(Arc::new(HashJoinExec {
left: Arc::clone(&self.left),
right: Arc::clone(&self.right),
on: self.on.clone(),
filter: self.filter.clone(),
join_type: self.join_type,
join_schema: Arc::clone(&self.join_schema),
left_fut: OnceAsync::default(),
random_state: self.random_state.clone(),
mode: self.mode,
metrics: ExecutionPlanMetricsSet::new(),
projection: self.projection.clone(),
column_indices: self.column_indices.clone(),
null_equality: self.null_equality,
cache: self.cache.clone(),
}))
}

fn execute(
&self,
partition: usize,
Expand Down
5 changes: 2 additions & 3 deletions datafusion/physical-plan/src/recursive_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ fn assign_work_table(
}

/// Some plans will change their internal states after execution, making them unable to be executed again.
/// This function uses `ExecutionPlan::with_new_children` to fork a new plan with initial states.
/// This function uses [`ExecutionPlan::reset_state`] to reset any internal state within the plan.
///
/// An example is `CrossJoinExec`, which loads the left table into memory and stores it in the plan.
/// However, if the data of the left table is derived from the work table, it will become outdated
Expand All @@ -383,8 +383,7 @@ fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPl
if plan.as_any().is::<WorkTableExec>() {
Ok(Transformed::no(plan))
} else {
let new_plan = Arc::clone(&plan)
.with_new_children(plan.children().into_iter().cloned().collect())?;
let new_plan = Arc::clone(&plan).reset_state()?;
Ok(Transformed::yes(new_plan))
}
})
Expand Down
80 changes: 58 additions & 22 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,29 @@ impl SortExec {
self
}

/// Add or reset `self.filter` to a new `DynamicFilterPhysicalExpr`.
fn create_filter(&self) -> Arc<DynamicFilterPhysicalExpr> {
let children = self
.expr
.iter()
.map(|sort_expr| Arc::clone(&sort_expr.expr))
.collect::<Vec<_>>();
Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true)))
}

fn cloned(&self) -> Self {
SortExec {
input: Arc::clone(&self.input),
expr: self.expr.clone(),
metrics_set: self.metrics_set.clone(),
preserve_partitioning: self.preserve_partitioning,
common_sort_prefix: self.common_sort_prefix.clone(),
fetch: self.fetch,
cache: self.cache.clone(),
filter: self.filter.clone(),
}
}

/// Modify how many rows to include in the result
///
/// If None, then all rows will be returned, in sorted order.
Expand All @@ -926,25 +949,13 @@ impl SortExec {
}
let filter = fetch.is_some().then(|| {
// If we already have a filter, keep it. Otherwise, create a new one.
self.filter.clone().unwrap_or_else(|| {
let children = self
.expr
.iter()
.map(|sort_expr| Arc::clone(&sort_expr.expr))
.collect::<Vec<_>>();
Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true)))
})
self.filter.clone().unwrap_or_else(|| self.create_filter())
});
SortExec {
input: Arc::clone(&self.input),
expr: self.expr.clone(),
metrics_set: self.metrics_set.clone(),
preserve_partitioning: self.preserve_partitioning,
common_sort_prefix: self.common_sort_prefix.clone(),
fetch,
cache,
filter,
}
let mut new_sort = self.cloned();
new_sort.fetch = fetch;
new_sort.cache = cache;
new_sort.filter = filter;
new_sort
}

/// Input schema
Expand Down Expand Up @@ -1116,10 +1127,35 @@ impl ExecutionPlan for SortExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut new_sort = SortExec::new(self.expr.clone(), Arc::clone(&children[0]))
.with_fetch(self.fetch)
.with_preserve_partitioning(self.preserve_partitioning);
new_sort.filter = self.filter.clone();
let mut new_sort = self.cloned();
assert!(
children.len() == 1,
"SortExec should have exactly one child"
);
new_sort.input = Arc::clone(&children[0]);
// Recompute the properties based on the new input since they may have changed
let (cache, sort_prefix) = Self::compute_properties(
&new_sort.input,
new_sort.expr.clone(),
new_sort.preserve_partitioning,
)?;
new_sort.cache = cache;
new_sort.common_sort_prefix = sort_prefix;

Ok(Arc::new(new_sort))
}

fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
let children = self.children().into_iter().cloned().collect();
let new_sort = self.with_new_children(children)?;
let mut new_sort = new_sort
.as_any()
.downcast_ref::<SortExec>()
.expect("cloned 1 lines above this line, we know the type")
.clone();
// Our dynamic filter and execution metrics are the state we need to reset.
new_sort.filter = Some(new_sort.create_filter());
new_sort.metrics_set = ExecutionPlanMetricsSet::new();

Ok(Arc::new(new_sort))
}
Expand Down
55 changes: 55 additions & 0 deletions datafusion/sqllogictest/test_files/cte.slt
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,61 @@ physical_plan
08)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)------------WorkTableExec: name=numbers

# Test for issue #16998: SortExec shares DynamicFilterPhysicalExpr across multiple executions
query II
with recursive r as (
select 0 as k, 0 as v
union all
(
select *
from r
order by v
limit 1
)
)
select *
from r
limit 5;
----
0 0
0 0
0 0
0 0
0 0

query TT
explain
with recursive r as (
select 0 as k, 0 as v
union all
(
select *
from r
order by v
limit 1
)
)
select *
from r
limit 5;
----
logical_plan
01)SubqueryAlias: r
02)--Limit: skip=0, fetch=5
03)----RecursiveQuery: is_distinct=false
04)------Projection: Int64(0) AS k, Int64(0) AS v
05)--------EmptyRelation
06)------Sort: r.v ASC NULLS LAST, fetch=1
07)--------Projection: r.k, r.v
08)----------TableScan: r
physical_plan
01)GlobalLimitExec: skip=0, fetch=5
02)--RecursiveQueryExec: name=r, is_distinct=false
03)----ProjectionExec: expr=[0 as k, 0 as v]
04)------PlaceholderRowExec
05)----SortExec: TopK(fetch=1), expr=[v@1 ASC NULLS LAST], preserve_partitioning=[false]
06)------WorkTableExec: name=r

statement count 0
set datafusion.execution.enable_recursive_ctes = false;

Expand Down
10 changes: 10 additions & 0 deletions docs/source/library-user-guide/upgrading.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ This version of DataFusion upgrades the underlying Apache Arrow implementation
to version `56.0.0`. See the [release notes](https://github.com/apache/arrow-rs/releases/tag/56.0.0)
for more details.

### Added `ExecutionPlan::reset_state`

In order to fix a bug in DataFusion `49.0.0` where dynamic filters (currently only generated in the precense of a query such as `ORDER BY ... LIMIT ...`)
produced incorrect results in recursive queries, a new method `reset_state` has been added to the `ExecutionPlan` trait.

Any `ExecutionPlan` that needs to maintain internal state or references to other nodes in the execution plan tree should implement this method to reset that state.
See [#17028] for more details and an example implementation for `SortExec`.

[#17028]: https://github.com/apache/datafusion/pull/17028

## DataFusion `49.0.0`

### `MSRV` updated to 1.85.1
Expand Down