Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ recursive_protection = [
"datafusion-common/recursive_protection",
"datafusion-expr/recursive_protection",
"datafusion-optimizer/recursive_protection",
"datafusion-physical-expr/recursive_protection",
"datafusion-physical-optimizer/recursive_protection",
"datafusion-sql/recursive_protection",
"sqlparser/recursive-protection",
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2092,6 +2092,7 @@ impl Normalizeable for Expr {
}

impl NormalizeEq for Expr {
#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn normalize_eq(&self, other: &Self) -> bool {
match (self, other) {
(
Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/logical_plan/invariants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub fn assert_executable_invariants(plan: &LogicalPlan) -> Result<()> {
///
/// Refer to [`UserDefinedLogicalNode::check_invariants`](super::UserDefinedLogicalNode)
/// for more details of user-provided extension node invariants.
#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn assert_valid_extension_nodes(plan: &LogicalPlan, check: InvariantLevel) -> Result<()> {
plan.apply_with_subqueries(|plan: &LogicalPlan| {
if let LogicalPlan::Extension(Extension { node }) = plan {
Expand Down Expand Up @@ -372,6 +373,7 @@ fn check_aggregation_in_scalar_subquery(
Ok(())
}

#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn strip_inner_query(inner_plan: &LogicalPlan) -> &LogicalPlan {
match inner_plan {
LogicalPlan::Projection(projection) => {
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2278,6 +2278,7 @@ impl Filter {
Self::try_new_internal(predicate, input)
}

#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn is_allowed_filter_type(data_type: &DataType) -> bool {
match data_type {
// Interpret NULL as a missing boolean value.
Expand Down
3 changes: 3 additions & 0 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,7 @@ pub fn split_conjunction(expr: &Expr) -> Vec<&Expr> {
split_conjunction_impl(expr, vec![])
}

#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn split_conjunction_impl<'a>(expr: &'a Expr, mut exprs: Vec<&'a Expr>) -> Vec<&'a Expr> {
match expr {
Expr::BinaryExpr(BinaryExpr {
Expand Down Expand Up @@ -1051,6 +1052,7 @@ pub fn split_binary_owned(expr: Expr, op: Operator) -> Vec<Expr> {
split_binary_owned_impl(expr, op, vec![])
}

#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn split_binary_owned_impl(
expr: Expr,
operator: Operator,
Expand Down Expand Up @@ -1078,6 +1080,7 @@ pub fn split_binary(expr: &Expr, op: Operator) -> Vec<&Expr> {
split_binary_impl(expr, op, vec![])
}

#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn split_binary_impl<'a>(
expr: &'a Expr,
operator: Operator,
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/src/decorrelate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ fn can_pullup_over_aggregation(expr: &Expr) -> bool {
}
}

#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn collect_local_correlated_cols(
plan: &LogicalPlan,
all_cols_map: &HashMap<LogicalPlan, BTreeSet<Column>>,
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/src/decorrelate_predicate_subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl OptimizerRule for DecorrelatePredicateSubquery {
true
}

#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn rewrite(
&self,
plan: LogicalPlan,
Expand Down
4 changes: 4 additions & 0 deletions datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ fn rewrite_children(
/// Assumes can_flatten_join_inputs has returned true and thus the plan can be
/// flattened. Adds all leaf inputs to `all_inputs` and join_keys to
/// possible_join_keys
#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn flatten_join_inputs(
plan: LogicalPlan,
possible_join_keys: &mut JoinKeySet,
Expand Down Expand Up @@ -264,6 +265,7 @@ fn flatten_join_inputs(
/// `flatten_join_inputs`
///
/// Must stay in sync with `flatten_join_inputs`
#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn can_flatten_join_inputs(plan: &LogicalPlan) -> bool {
// can only flatten inner / cross joins
match plan {
Expand Down Expand Up @@ -368,6 +370,7 @@ fn find_inner_join(
}

/// Extract join keys from a WHERE clause
#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn extract_possible_join_keys(expr: &Expr, join_keys: &mut JoinKeySet) {
if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
match op {
Expand Down Expand Up @@ -399,6 +402,7 @@ fn extract_possible_join_keys(expr: &Expr, join_keys: &mut JoinKeySet) {
/// # Returns
/// * `Some()` when there are few remaining predicates in filter_expr
/// * `None` otherwise
#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn remove_join_expressions(expr: Expr, join_keys: &JoinKeySet) -> Option<Expr> {
match expr {
Expr::BinaryExpr(BinaryExpr {
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/src/eliminate_group_by_constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ impl OptimizerRule for EliminateGroupByConstant {
///
/// Intended to be used only within this rule, helper function, which heavily
/// relies on `SimplifyExpressions` result.
#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn is_constant_expression(expr: &Expr) -> bool {
match expr {
Expr::Alias(e) => is_constant_expression(&e.expr),
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/src/eliminate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ impl OptimizerRule for EliminateLimit {
true
}

#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn rewrite(
&self,
plan: LogicalPlan,
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/src/eliminate_outer_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ pub fn eliminate_outer(
/// For IS NOT NULL/NOT expr, always returns false for NULL input.
/// extracts columns from these exprs.
/// For all other exprs, fall through
#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn extract_non_nullable_columns(
expr: &Expr,
non_nullable_cols: &mut Vec<Column>,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ fn extract_or_clauses_for_join<'a>(
/// Otherwise, return None.
///
/// For other clause, apply the rule above to extract clause.
#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn extract_or_clause(expr: &Expr, schema_columns: &HashSet<Column>) -> Option<Expr> {
let mut predicate = None;

Expand Down Expand Up @@ -764,6 +765,7 @@ impl OptimizerRule for PushDownFilter {
true
}

#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn rewrite(
&self,
plan: LogicalPlan,
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/src/push_down_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ impl OptimizerRule for PushDownLimit {
true
}

#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn rewrite(
&self,
plan: LogicalPlan,
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/src/scalar_subquery_to_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl OptimizerRule for ScalarSubqueryToJoin {
true
}

#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn rewrite(
&self,
plan: LogicalPlan,
Expand Down
4 changes: 4 additions & 0 deletions datafusion/optimizer/src/simplify_expressions/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub static POWS_OF_TEN: [i128; 38] = [

/// returns true if `needle` is found in a chain of search_op
/// expressions. Such as: (A AND B) AND C
#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn expr_contains_inner(expr: &Expr, needle: &Expr, search_op: Operator) -> bool {
match expr {
Expr::BinaryExpr(BinaryExpr { left, op, right }) if *op == search_op => {
Expand All @@ -86,6 +87,7 @@ pub fn expr_contains(expr: &Expr, needle: &Expr, search_op: Operator) -> bool {
/// expressions. Such as: A ^ (A ^ (B ^ A))
pub fn delete_xor_in_complex_expr(expr: &Expr, needle: &Expr, is_left: bool) -> Expr {
/// Deletes recursively 'needles' in a chain of xor expressions
#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn recursive_delete_xor_in_expr(
expr: &Expr,
needle: &Expr,
Expand Down Expand Up @@ -266,6 +268,7 @@ pub fn as_bool_lit(expr: &Expr) -> Result<Option<bool>> {
/// For Between, not (A between B and C) ===> (A not between B and C)
/// not (A not between B and C) ===> (A between B and C)
/// For others, use Not clause
#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
pub fn negate_clause(expr: Expr) -> Expr {
match expr {
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
Expand Down Expand Up @@ -335,6 +338,7 @@ pub fn negate_clause(expr: Expr) -> Expr {
/// For Negative:
/// ~(~A) ===> A
/// For others, use Negative clause
#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
pub fn distribute_negation(expr: Expr) -> Expr {
match expr {
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ workspace = true
[lib]
name = "datafusion_physical_expr"

[features]
recursive_protection = ["dep:recursive"]

[dependencies]
ahash = { workspace = true }
arrow = { workspace = true }
Expand All @@ -52,6 +55,7 @@ itertools = { workspace = true, features = ["use_std"] }
log = { workspace = true }
paste = "^1.0"
petgraph = "0.8.2"
recursive = { workspace = true, optional = true }

[dev-dependencies]
arrow = { workspace = true, features = ["test_utils"] }
Expand Down
3 changes: 3 additions & 0 deletions datafusion/physical-expr/src/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ impl PhysicalExpr for BinaryExpr {
self
}

#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
BinaryTypeCoercer::new(
&self.left.data_type(input_schema)?,
Expand All @@ -356,6 +357,7 @@ impl PhysicalExpr for BinaryExpr {
Ok(self.left.nullable(input_schema)? || self.right.nullable(input_schema)?)
}

#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
use arrow::compute::kernels::numeric::*;

Expand Down Expand Up @@ -648,6 +650,7 @@ impl PhysicalExpr for BinaryExpr {
}
}

#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
fn write_child(
f: &mut std::fmt::Formatter,
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-expr/src/intervals/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use datafusion_expr::Operator;
/// We do not support every type of [`Operator`]s either. Over time, this check
/// will relax as more types of `PhysicalExpr`s and `Operator`s are supported.
/// Currently, [`CastExpr`], [`NegativeExpr`], [`BinaryExpr`], [`Column`] and [`Literal`] are supported.
#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
pub fn check_support(expr: &Arc<dyn PhysicalExpr>, schema: &SchemaRef) -> bool {
let expr_any = expr.as_any();
if let Some(binary_expr) = expr_any.downcast_ref::<BinaryExpr>() {
Expand Down
2 changes: 2 additions & 0 deletions datafusion/physical-expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ use datafusion_expr::{
/// * `e` - The logical expression
/// * `input_dfschema` - The DataFusion schema for the input, used to resolve `Column` references
/// to qualified or unqualified fields by name.
#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
pub fn create_physical_expr(
e: &Expr,
input_dfschema: &DFSchema,
Expand Down Expand Up @@ -385,6 +386,7 @@ pub fn create_physical_expr(
}

/// Create vector of Physical Expression from a vector of logical expression
#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
pub fn create_physical_exprs<'a, I>(
exprs: I,
input_dfschema: &DFSchema,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/physical-optimizer/src/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ pub fn reorder_join_keys_to_inputs(
}

/// Reorder the current join keys ordering based on either left partition or right partition
#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn reorder_current_join_keys(
join_keys: JoinKeyPairs,
left_partition: Option<&Partitioning>,
Expand Down Expand Up @@ -1011,6 +1012,7 @@ fn remove_dist_changing_operators(
/// " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
/// " DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
/// ```
#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
pub fn replace_order_preserving_variants(
mut context: DistributionContext,
) -> Result<DistributionContext> {
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-optimizer/src/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ enum ParentPredicateStates {
Supported,
}

#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn push_down_filters(
node: Arc<dyn ExecutionPlan>,
parent_predicates: Vec<Arc<dyn PhysicalExpr>>,
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-optimizer/src/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ pub fn pushdown_limit_helper(
}

/// Pushes down the limit through the plan.
#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
pub(crate) fn pushdown_limits(
pushdown_plan: Arc<dyn ExecutionPlan>,
global_state: GlobalRequirements,
Expand Down
3 changes: 3 additions & 0 deletions datafusion/substrait/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ async-recursion = "1.0"
async-trait = { workspace = true }
chrono = { workspace = true }
datafusion = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
object_store = { workspace = true }
pbjson-types = { workspace = true }
prost = { workspace = true }
recursive = { workspace = true, optional = true }
substrait = { version = "0.57", features = ["serde"] }
url = { workspace = true }
tokio = { workspace = true, features = ["fs"] }
Expand All @@ -54,6 +56,7 @@ insta = { workspace = true }
default = ["physical"]
physical = ["datafusion/parquet"]
protoc = ["substrait/protoc"]
recursive_protection = ["dep:recursive", "datafusion/recursive_protection"]

[package.metadata.docs.rs]
# Use default features ("physical") for docs, plus "protoc". "protoc" is needed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ fn arg_list_to_binary_op_tree(op: Operator, mut args: Vec<Expr>) -> Result<Expr>
///
/// `take_len` represents the number of elements to take from `args` before returning.
/// We use `take_len` to avoid recursively building a `Take<Take<Take<...>>>` type.
#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
fn arg_list_to_binary_op_tree_inner(
op: Operator,
args: &mut Drain<Expr>,
Expand Down
Loading