From ebd491ff6d130cc7b1743e1507c246be2818ca68 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Thu, 25 Sep 2025 20:01:04 +1000 Subject: [PATCH 1/2] c --- .../plans/optimizer/set_order/expr_pullup.rs | 90 +--- .../optimizer/set_order/expr_pushdown.rs | 442 +++++++++--------- 2 files changed, 255 insertions(+), 277 deletions(-) diff --git a/crates/polars-plan/src/plans/optimizer/set_order/expr_pullup.rs b/crates/polars-plan/src/plans/optimizer/set_order/expr_pullup.rs index 91a7d3d4e69b..9baab5ed6c4c 100644 --- a/crates/polars-plan/src/plans/optimizer/set_order/expr_pullup.rs +++ b/crates/polars-plan/src/plans/optimizer/set_order/expr_pullup.rs @@ -1,81 +1,33 @@ use polars_utils::arena::Arena; -use crate::dsl::EvalVariant; -use crate::plans::{AExpr, IRAggExpr}; +use crate::plans::AExpr; +use crate::plans::set_order::expr_pushdown::{ + ExprOutputOrder, FrameOrderObserved, ObservableOrderingsResolver, +}; -/// Determine whether the output of an expression has a defined order. -/// -/// This will recursively walk through the expression and answers the question: -/// -/// > Given that the input dataframe (does not have/has) a defined ordered, does the expression -/// > have a defined output order? -#[recursive::recursive] pub fn is_output_ordered(aexpr: &AExpr, arena: &Arena, frame_ordered: bool) -> bool { - macro_rules! rec { - ($node:expr) => {{ is_output_ordered(arena.get($node), arena, frame_ordered) }}; - } - match aexpr { - // Explode creates local orders. - AExpr::Explode { .. } => true, - AExpr::Column(..) => frame_ordered, - AExpr::Literal(lv) => !lv.is_scalar(), - - // All elementwise expressions are ordered if any of its inputs are ordered. - AExpr::BinaryExpr { left, right, op: _ } => rec!(*left) || rec!(*right), - AExpr::Ternary { - predicate, - truthy, - falsy, - } => rec!(*predicate) || rec!(*truthy) || rec!(*falsy), - AExpr::Cast { - expr, - dtype: _, - options: _, - } => rec!(*expr), - - // Sorts always output in a defined ordering. - AExpr::Sort { .. } | AExpr::SortBy { .. } => true, - AExpr::Gather { - expr: _, - idx, - returns_scalar, - } => !returns_scalar && rec!(*idx), - AExpr::Filter { input, by } => rec!(*input) || rec!(*by), + use ExprOutputOrder as O; - // This aggregation is jiberish. Just be conservative. - AExpr::Agg(IRAggExpr::AggGroups(_)) => true, + match ObservableOrderingsResolver::new(if frame_ordered { + O::Independent + } else { + O::None + }) + .resolve_observable_orderings(aexpr, arena) + { + Ok(O::None) => false, + Ok(O::Independent) => true, - // Aggregations always output 1 row. - AExpr::Agg(..) | AExpr::Len => false, - - AExpr::Eval { - expr, - evaluation: _, - variant, - } => match variant { - EvalVariant::List => rec!(*expr), - EvalVariant::Cumulative { min_samples: _ } => true, - }, - AExpr::AnonymousFunction { input, options, .. } - | AExpr::Function { input, options, .. } => { - if options.flags.returns_scalar() || options.flags.is_output_unordered() { - false - } else if options.is_elementwise() { - input.iter().any(|e| rec!(e.node())) - } else if options.flags.propagates_order() { - assert_eq!(input.len(), 1); - rec!(input[0].node()) + Ok(O::Frame | O::Both) | Err(FrameOrderObserved) => { + // It is a logic error to hit this branch, as that would mean that frame ordering was + // introduced into the expression tree from a non-column node. + // + // In release mode just conservatively indicate ordered output. + if cfg!(debug_assertions) { + unreachable!() } else { true } }, - // @Performance. This is probably quite pessimistic and can be optimizes to be `false` in - // some cases. - AExpr::Window { .. } => true, - AExpr::Slice { - input, - offset: _, - length: _, - } => rec!(*input), } } diff --git a/crates/polars-plan/src/plans/optimizer/set_order/expr_pushdown.rs b/crates/polars-plan/src/plans/optimizer/set_order/expr_pushdown.rs index f3ed306ca220..8f06db14f2ce 100644 --- a/crates/polars-plan/src/plans/optimizer/set_order/expr_pushdown.rs +++ b/crates/polars-plan/src/plans/optimizer/set_order/expr_pushdown.rs @@ -92,240 +92,266 @@ pub fn adjust_for_with_columns_context( /// /// This answers the question: /// > Given that my output is (un)ordered, can my input be unordered? -#[recursive::recursive] pub fn get_frame_observing( aexpr: &AExpr, expr_arena: &Arena, ) -> Result { - macro_rules! rec { - ($expr:expr) => {{ get_frame_observing(expr_arena.get($expr), expr_arena)? }}; - } + ObservableOrderingsResolver::new(ExprOutputOrder::Frame) + .resolve_observable_orderings(aexpr, expr_arena) +} + +pub(super) struct ObservableOrderingsResolver { + column_ordering: ExprOutputOrder, +} + +impl ObservableOrderingsResolver { + pub(super) fn new(column_ordering: ExprOutputOrder) -> Self { + use ExprOutputOrder as O; - macro_rules! zip { - ($($expr:expr),*) => {{ zip([$(Ok(rec!($expr))),*])? }}; + match &column_ordering { + O::Frame | O::Independent | O::None => {}, + O::Both => panic!(), + } + + Self { column_ordering } } - use ExprOutputOrder as O; - Ok(match aexpr { - // Explode creates local orders. - // - // The following observes order: - // - // a: [[1, 2], [3]] - // b: [[3], [4, 5]] - // - // col(a).explode() * col(b).explode() - AExpr::Explode { expr, .. } => rec!(*expr) | O::Independent, - - AExpr::Column(_) => O::Frame, - AExpr::Literal(lv) if lv.is_scalar() => O::None, - AExpr::Literal(_) => O::Independent, - - AExpr::Cast { expr, .. } => rec!(*expr), - - // Elementwise can be seen as a `zip + op`. - AExpr::BinaryExpr { left, op: _, right } => zip!(*left, *right), - AExpr::Filter { input, by } => zip!(*input, *by), - AExpr::Ternary { - predicate, - truthy, - falsy, - } => zip!(*predicate, *truthy, *falsy), - - AExpr::Sort { expr, options } => { - if options.maintain_order { - rec!(*expr) | O::Independent - } else { - _ = rec!(*expr); - O::Independent - } - }, - AExpr::SortBy { - expr, - by, - sort_options, - } => { - let mut zipped = rec!(*expr); - for e in by { - zipped = zipped.zip_with(rec!(*e))?; - } - - if sort_options.maintain_order { - zipped | O::Independent - } else { - O::Independent - } - }, - - AExpr::Agg(agg) => match agg { - // Input order agnostic aggregations. - IRAggExpr::Min { input: node, .. } - | IRAggExpr::Max { input: node, .. } - | IRAggExpr::Median(node) - | IRAggExpr::NUnique(node) - | IRAggExpr::Mean(node) - | IRAggExpr::Sum(node) - | IRAggExpr::Count { input: node, .. } - | IRAggExpr::Std(node, _) - | IRAggExpr::Var(node, _) => { - // Input order is deregarded, but must not observe order. - _ = rec!(*node); - O::None + #[recursive::recursive] + pub(super) fn resolve_observable_orderings( + &self, + aexpr: &AExpr, + expr_arena: &Arena, + ) -> Result { + macro_rules! rec { + ($expr:expr) => {{ self.resolve_observable_orderings(expr_arena.get($expr), expr_arena)? }}; + } + + macro_rules! zip { + ($($expr:expr),*) => {{ zip([$(Ok(rec!($expr))),*])? }}; + } + + use ExprOutputOrder as O; + Ok(match aexpr { + // Explode creates local orders. + // + // The following observes order: + // + // a: [[1, 2], [3]] + // b: [[3], [4, 5]] + // + // col(a).explode() * col(b).explode() + AExpr::Explode { expr, .. } => rec!(*expr) | O::Independent, + + AExpr::Column(_) => self.column_ordering, + AExpr::Literal(lv) if lv.is_scalar() => O::None, + AExpr::Literal(_) => O::Independent, + + AExpr::Cast { expr, .. } => rec!(*expr), + + // Elementwise can be seen as a `zip + op`. + AExpr::BinaryExpr { left, op: _, right } => zip!(*left, *right), + AExpr::Filter { input, by } => zip!(*input, *by), + AExpr::Ternary { + predicate, + truthy, + falsy, + } => zip!(*predicate, *truthy, *falsy), + + AExpr::Sort { expr, options } => { + if options.maintain_order { + rec!(*expr) | O::Independent + } else { + _ = rec!(*expr); + O::Independent + } + }, + AExpr::SortBy { + expr, + by, + sort_options, + } => { + let mut zipped = rec!(*expr); + for e in by { + zipped = zipped.zip_with(rec!(*e))?; + } + + if sort_options.maintain_order { + zipped | O::Independent + } else { + O::Independent + } }, - IRAggExpr::Quantile { expr, quantile, .. } => { - // Input and quantile order is deregarded, but must not observe order. - _ = rec!(*expr); - _ = rec!(*quantile); - O::None + + AExpr::Agg(agg) => match agg { + // Input order agnostic aggregations. + IRAggExpr::Min { input: node, .. } + | IRAggExpr::Max { input: node, .. } + | IRAggExpr::Median(node) + | IRAggExpr::NUnique(node) + | IRAggExpr::Mean(node) + | IRAggExpr::Sum(node) + | IRAggExpr::Count { input: node, .. } + | IRAggExpr::Std(node, _) + | IRAggExpr::Var(node, _) => { + // Input order is deregarded, but must not observe order. + _ = rec!(*node); + O::None + }, + IRAggExpr::Quantile { expr, quantile, .. } => { + // Input and quantile order is deregarded, but must not observe order. + _ = rec!(*expr); + _ = rec!(*quantile); + O::None + }, + + // Input order observing aggregations. + IRAggExpr::Implode(node) | IRAggExpr::First(node) | IRAggExpr::Last(node) => { + if rec!(*node).has_frame_ordering() { + return Err(FrameOrderObserved); + } + O::None + }, + + // @NOTE: This aggregation makes very little sense. We do the most pessimistic thing + // possible here. + IRAggExpr::AggGroups(_) => return Err(FrameOrderObserved), }, - // Input order observing aggregations. - IRAggExpr::Implode(node) | IRAggExpr::First(node) | IRAggExpr::Last(node) => { - if rec!(*node).has_frame_ordering() { + AExpr::Gather { + expr, + idx, + returns_scalar, + } => { + let expr = rec!(*expr); + let idx = rec!(*idx); + + // We need to ensure that the values come in frame order. The order of the idxes is + // propagated. + if expr.has_frame_ordering() { return Err(FrameOrderObserved); } - O::None + + if *returns_scalar { O::None } else { idx } }, + AExpr::AnonymousFunction { input, options, .. } + | AExpr::Function { input, options, .. } => { + // By definition, does not observe any input so cannot observe any ordering. + if input.is_empty() { + return Ok( + if options.flags.returns_scalar() || options.flags.is_output_unordered() { + O::None + } else { + O::Independent + }, + ); + } - // @NOTE: This aggregation makes very little sense. We do the most pessimistic thing - // possible here. - IRAggExpr::AggGroups(_) => return Err(FrameOrderObserved), - }, - - AExpr::Gather { - expr, - idx, - returns_scalar, - } => { - let expr = rec!(*expr); - let idx = rec!(*idx); - - // We need to ensure that the values come in frame order. The order of the idxes is - // propagated. - if expr.has_frame_ordering() { - return Err(FrameOrderObserved); - } - - if *returns_scalar { O::None } else { idx } - }, - AExpr::AnonymousFunction { input, options, .. } - | AExpr::Function { input, options, .. } => { - // By definition, does not observe any input so cannot observe any ordering. - if input.is_empty() { - return Ok( - if options.flags.returns_scalar() || options.flags.is_output_unordered() { + // Elementwise are regarded as a `zip + function`. + if options.flags.is_elementwise() { + return zip(input.iter().map(|e| Ok(rec!(e.node())))); + } + + if options.flags.propagates_order() { + // Propagate the order of the singular input, this is for expressions like + // `drop_nulls` and `rechunk`. + assert_eq!(input.len(), 1); + match rec!(input[0].node()) { + v if !(options.flags.is_row_separable() + || options.flags.is_input_order_agnostic()) + && v.has_frame_ordering() => + { + return Err(FrameOrderObserved); + }, + v => v, + } + } else if options.flags.is_input_order_agnostic() { + // There are also expressions that are entirely input order agnostic like + // `null_count` and `unique(maintain_order=False)` + for e in input { + _ = rec!(e.node()); + } + + if options.returns_scalar() || options.flags.is_output_unordered() { O::None } else { O::Independent - }, - ); - } - - // Elementwise are regarded as a `zip + function`. - if options.flags.is_elementwise() { - return zip(input.iter().map(|e| Ok(rec!(e.node())))); - } - - if options.flags.propagates_order() { - // Propagate the order of the singular input, this is for expressions like - // `drop_nulls` and `rechunk`. - assert_eq!(input.len(), 1); - match rec!(input[0].node()) { - v if !(options.flags.is_row_separable() - || options.flags.is_input_order_agnostic()) - && v.has_frame_ordering() => - { - return Err(FrameOrderObserved); - }, - v => v, - } - } else if options.flags.is_input_order_agnostic() { - // There are also expressions that are entirely input order agnostic like - // `null_count` and `unique(maintain_order=False)` - for e in input { - _ = rec!(e.node()); - } - - if options.returns_scalar() || options.flags.is_output_unordered() { - O::None + } } else { - O::Independent + // For other expressions, we are observing frame order i.f.f. any of the inputs are + // observing frame order. + for e in input { + if rec!(e.node()).has_frame_ordering() { + return Err(FrameOrderObserved); + } + } + + if options.returns_scalar() || options.flags.is_output_unordered() { + O::None + } else { + O::Independent + } } - } else { - // For other expressions, we are observing frame order i.f.f. any of the inputs are - // observing frame order. - for e in input { - if rec!(e.node()).has_frame_ordering() { + }, + + AExpr::Eval { + expr, + evaluation: _, + variant, + } => match variant { + EvalVariant::List => rec!(*expr), + EvalVariant::Cumulative { min_samples: _ } => { + let expr = rec!(*expr); + if expr.has_frame_ordering() { return Err(FrameOrderObserved); } - } + expr + }, + }, - if options.returns_scalar() || options.flags.is_output_unordered() { - O::None - } else { - O::Independent + AExpr::Window { + function, + partition_by, + order_by, + options: _, + } => { + let input = rec!(*function); + + // @Performance. + // All of the code below might be a bit pessimistic, several window function variants + // are length preserving and/or propagate order in specific ways. + if input.has_frame_ordering() { + return Err(FrameOrderObserved); } - } - }, - - AExpr::Eval { - expr, - evaluation: _, - variant, - } => match variant { - EvalVariant::List => rec!(*expr), - EvalVariant::Cumulative { min_samples: _ } => { - let expr = rec!(*expr); - if expr.has_frame_ordering() { + for e in partition_by { + if rec!(*e).has_frame_ordering() { + return Err(FrameOrderObserved); + } + } + if let Some((e, _)) = &order_by + && rec!(*e).has_frame_ordering() + { return Err(FrameOrderObserved); } - expr + O::Independent }, - }, - - AExpr::Window { - function, - partition_by, - order_by, - options: _, - } => { - let input = rec!(*function); - - // @Performance. - // All of the code below might be a bit pessimistic, several window function variants - // are length preserving and/or propagate order in specific ways. - if input.has_frame_ordering() { - return Err(FrameOrderObserved); - } - for e in partition_by { - if rec!(*e).has_frame_ordering() { + AExpr::Slice { + input, + offset, + length, + } => { + // @NOTE + // `offset` and `length` are supposed to be scalars, they have to resolved as they + // might be order observing, but are not important for the output order. + _ = rec!(*offset); + _ = rec!(*length); + + let input = rec!(*input); + if input.has_frame_ordering() { return Err(FrameOrderObserved); } - } - if let Some((e, _)) = &order_by - && rec!(*e).has_frame_ordering() - { - return Err(FrameOrderObserved); - } - O::Independent - }, - AExpr::Slice { - input, - offset, - length, - } => { - // @NOTE - // `offset` and `length` are supposed to be scalars, they have to resolved as they - // might be order observing, but are not important for the output order. - _ = rec!(*offset); - _ = rec!(*length); - - let input = rec!(*input); - if input.has_frame_ordering() { - return Err(FrameOrderObserved); - } - input - }, - AExpr::Len => O::None, - }) + input + }, + AExpr::Len => O::None, + }) + } } From 9eb6e3100fdfa794f8a298f360489d4b92cc8004 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Thu, 25 Sep 2025 20:10:02 +1000 Subject: [PATCH 2/2] c --- .../src/plans/optimizer/set_order/expr_pushdown.rs | 2 +- .../src/plans/optimizer/set_order/ir_pushdown.rs | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/polars-plan/src/plans/optimizer/set_order/expr_pushdown.rs b/crates/polars-plan/src/plans/optimizer/set_order/expr_pushdown.rs index 8f06db14f2ce..002e882776b3 100644 --- a/crates/polars-plan/src/plans/optimizer/set_order/expr_pushdown.rs +++ b/crates/polars-plan/src/plans/optimizer/set_order/expr_pushdown.rs @@ -92,7 +92,7 @@ pub fn adjust_for_with_columns_context( /// /// This answers the question: /// > Given that my output is (un)ordered, can my input be unordered? -pub fn get_frame_observing( +pub fn get_observable_orders( aexpr: &AExpr, expr_arena: &Arena, ) -> Result { diff --git a/crates/polars-plan/src/plans/optimizer/set_order/ir_pushdown.rs b/crates/polars-plan/src/plans/optimizer/set_order/ir_pushdown.rs index a3b41645dd43..817ce655be01 100644 --- a/crates/polars-plan/src/plans/optimizer/set_order/ir_pushdown.rs +++ b/crates/polars-plan/src/plans/optimizer/set_order/ir_pushdown.rs @@ -9,7 +9,7 @@ use polars_utils::arena::{Arena, Node}; use polars_utils::idx_vec::UnitVec; use polars_utils::unique_id::UniqueId; -use super::expr_pushdown::{adjust_for_with_columns_context, get_frame_observing, zip}; +use super::expr_pushdown::{adjust_for_with_columns_context, get_observable_orders, zip}; use crate::dsl::{PartitionVariantIR, SinkTypeIR, UnionOptions}; use crate::plans::set_order::expr_pushdown::FrameOrderObserved; use crate::plans::{AExpr, IR, is_scalar_ae}; @@ -91,7 +91,7 @@ pub(super) fn pushdown_orders( let is_order_observing = sort_options.maintain_order || { adjust_for_with_columns_context(zip(by_column .iter() - .map(|e| get_frame_observing(expr_arena.get(e.node()), expr_arena)))) + .map(|e| get_observable_orders(expr_arena.get(e.node()), expr_arena)))) .is_err() }; [is_order_observing].into() @@ -120,7 +120,7 @@ pub(super) fn pushdown_orders( let expr_observing = adjust_for_with_columns_context(zip(keys .iter() .chain(aggs.iter()) - .map(|e| get_frame_observing(expr_arena.get(e.node()), expr_arena)))) + .map(|e| get_observable_orders(expr_arena.get(e.node()), expr_arena)))) .is_err(); expr_observing @@ -218,7 +218,7 @@ pub(super) fn pushdown_orders( let input = *input; let mut observing = zip(exprs .iter() - .map(|e| get_frame_observing(expr_arena.get(e.node()), expr_arena))); + .map(|e| get_observable_orders(expr_arena.get(e.node()), expr_arena))); let input_schema = ir_arena.get(input).schema(ir_arena).as_ref().clone(); ir = ir_arena.get_mut(node); @@ -244,7 +244,7 @@ pub(super) fn pushdown_orders( IR::Select { expr: exprs, .. } => { let observing = zip(exprs .iter() - .map(|e| get_frame_observing(expr_arena.get(e.node()), expr_arena))); + .map(|e| get_observable_orders(expr_arena.get(e.node()), expr_arena))); let is_order_observing = match observing { Ok(o) => o.has_frame_ordering() && !all_outputs_unordered, Err(FrameOrderObserved) => true, @@ -256,7 +256,7 @@ pub(super) fn pushdown_orders( input: _, predicate, } => { - let observing = adjust_for_with_columns_context(get_frame_observing( + let observing = adjust_for_with_columns_context(get_observable_orders( expr_arena.get(predicate.node()), expr_arena, )); @@ -294,7 +294,7 @@ pub(super) fn pushdown_orders( PartitionVariantIR::Parted { .. } => true, PartitionVariantIR::ByKey { key_exprs, .. } => { adjust_for_with_columns_context(zip(key_exprs.iter().map(|e| { - get_frame_observing(expr_arena.get(e.node()), expr_arena) + get_observable_orders(expr_arena.get(e.node()), expr_arena) }))) .is_err() },