From 6b10cc1506692840e4b092415b20c9a3ad724946 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nikola=20Jovi=C4=87evi=C4=87?= Date: Fri, 18 Jul 2025 14:17:49 +0200 Subject: [PATCH 1/4] Initial commit. --- .../catalyst/analysis/WindowResolution.scala | 88 +++++++++++++------ 1 file changed, 60 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala index 5f69865b3532c..ba4b78cbf7512 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala @@ -107,8 +107,11 @@ object WindowResolution { * * By checking the type and configuration of [[WindowExpression.windowFunction]] it enforces the * following rules: + * - Disallows [[FrameLessOffsetWindowFunction]] (e.g. [[Lag]]) without defined ordering or + * one with a frame which is defined as something other than an offset frame (e.g. + * `ROWS BETWEEN` is logically incompatible with offset functions). * - Disallows distinct aggregate expressions in window functions. - * - Disallows use of certain aggregate functions - [[ListaAgg]], [[PercentileCont]], + * - Disallows use of certain aggregate functions - [[ListAgg]], [[PercentileCont]], * [[PercentileDisc]], [[Median]] * - Allows only window functions of following types: * - [[AggregateExpression]] (non-distinct) @@ -116,35 +119,64 @@ object WindowResolution { * - [[AggregateWindowFunction]] */ def validateResolvedWindowExpression(windowExpression: WindowExpression): Unit = { - windowExpression.windowFunction match { - case AggregateExpression(_, _, true, _, _) => + windowExpression match { + case _ @ WindowExpression( + windowFunction: FrameLessOffsetWindowFunction, + WindowSpecDefinition(_, order, frame: SpecifiedWindowFrame) + ) if order.isEmpty || !frame.isOffset => windowExpression.failAnalysis( - errorClass = "DISTINCT_WINDOW_FUNCTION_UNSUPPORTED", - messageParameters = Map("windowExpr" -> toSQLExpr(windowExpression)) - ) - case agg @ AggregateExpression(fun: ListAgg, _, _, _, _) - // listagg(...) WITHIN GROUP (ORDER BY ...) OVER (ORDER BY ...) is unsupported - if fun.orderingFilled && (windowExpression.windowSpec.orderSpec.nonEmpty || - windowExpression.windowSpec.frameSpecification != - SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing)) => - agg.failAnalysis( - errorClass = "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC", - messageParameters = Map("aggFunc" -> toSQLExpr(agg.aggregateFunction)) - ) - case agg @ AggregateExpression(_: PercentileCont | _: PercentileDisc | _: Median, _, _, _, _) - if windowExpression.windowSpec.orderSpec.nonEmpty || - windowExpression.windowSpec.frameSpecification != - SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing) => - agg.failAnalysis( - errorClass = "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC", - messageParameters = Map("aggFunc" -> toSQLExpr(agg.aggregateFunction)) - ) - case _: AggregateExpression | _: FrameLessOffsetWindowFunction | _: AggregateWindowFunction => - case other => - other.failAnalysis( - errorClass = "UNSUPPORTED_EXPR_FOR_WINDOW", - messageParameters = Map("sqlExpr" -> toSQLExpr(other)) + errorClass = "WINDOW_FUNCTION_AND_FRAME_MISMATCH", + messageParameters = Map( + "funcName" -> toSQLExpr(windowFunction), + "windowExpr" -> toSQLExpr(windowExpression) + ) ) + case _ @ WindowExpression(windowFunction, windowSpec) => + windowFunction match { + case AggregateExpression(_, _, true, _, _) => + windowExpression.failAnalysis( + errorClass = "DISTINCT_WINDOW_FUNCTION_UNSUPPORTED", + messageParameters = Map("windowExpr" -> toSQLExpr(windowExpression)) + ) + case frameLessOffsetWindowFunction: FrameLessOffsetWindowFunction + if windowSpec.orderSpec.isEmpty || + (windowExpression.windowSpec.frameSpecification.isInstanceOf[SpecifiedWindowFrame] && + !windowExpression.windowSpec.frameSpecification + .asInstanceOf[SpecifiedWindowFrame] + .isOffset) => + frameLessOffsetWindowFunction.failAnalysis( + errorClass = "WINDOW_FUNCTION_AND_FRAME_MISMATCH", + messageParameters = Map( + "funcName" -> toSQLExpr(windowFunction), + "windowExpr" -> toSQLExpr(windowExpression) + ) + ) + case aggregateExpression @ AggregateExpression(fun: ListAgg, _, _, _, _) + if fun.orderingFilled && (windowSpec.orderSpec.nonEmpty || + windowSpec.frameSpecification != + SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing)) => + aggregateExpression.failAnalysis( + errorClass = "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC", + messageParameters = Map("aggFunc" -> toSQLExpr(aggregateExpression.aggregateFunction)) + ) + case aggregateExpression @ AggregateExpression( + _: PercentileCont | _: PercentileDisc | _: Median, _, _, _, _ + ) + if windowSpec.orderSpec.nonEmpty || + windowSpec.frameSpecification != + SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing) => + aggregateExpression.failAnalysis( + errorClass = "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC", + messageParameters = Map("aggFunc" -> toSQLExpr(aggregateExpression.aggregateFunction)) + ) + case _: AggregateExpression | _: FrameLessOffsetWindowFunction | + _: AggregateWindowFunction => + case other => + other.failAnalysis( + errorClass = "UNSUPPORTED_EXPR_FOR_WINDOW", + messageParameters = Map("sqlExpr" -> toSQLExpr(other)) + ) + } } } } From 9291c091897d316d02ad7ca77fe6055de94d47ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nikola=20Jovi=C4=87evi=C4=87?= Date: Fri, 18 Jul 2025 14:37:40 +0200 Subject: [PATCH 2/4] Remove check from `checkAnalysis`. --- .../spark/sql/catalyst/analysis/CheckAnalysis.scala | 9 --------- 1 file changed, 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 25ae710eeebb1..014b8ebece046 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -437,15 +437,6 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString errorClass = "WINDOW_FUNCTION_WITHOUT_OVER_CLAUSE", messageParameters = Map("funcName" -> toSQLExpr(w))) - case w @ WindowExpression(wf: FrameLessOffsetWindowFunction, - WindowSpecDefinition(_, order, frame: SpecifiedWindowFrame)) - if order.isEmpty || !frame.isOffset => - w.failAnalysis( - errorClass = "WINDOW_FUNCTION_AND_FRAME_MISMATCH", - messageParameters = Map( - "funcName" -> toSQLExpr(wf), - "windowExpr" -> toSQLExpr(w))) - case agg @ AggregateExpression(listAgg: ListAgg, _, _, _, _) if agg.isDistinct && listAgg.needSaveOrderValue => throw QueryCompilationErrors.functionAndOrderExpressionMismatchError( From e7152b72f505d89a8b8ae4791ed41b0d93a5cbe5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nikola=20Jovi=C4=87evi=C4=87?= Date: Mon, 21 Jul 2025 13:40:06 +0200 Subject: [PATCH 3/4] Separate into two checks to reduce diff. --- .../catalyst/analysis/WindowResolution.scala | 81 ++++++++----------- 1 file changed, 33 insertions(+), 48 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala index ba4b78cbf7512..9b7b529fcf3ee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala @@ -121,8 +121,8 @@ object WindowResolution { def validateResolvedWindowExpression(windowExpression: WindowExpression): Unit = { windowExpression match { case _ @ WindowExpression( - windowFunction: FrameLessOffsetWindowFunction, - WindowSpecDefinition(_, order, frame: SpecifiedWindowFrame) + windowFunction: FrameLessOffsetWindowFunction, + WindowSpecDefinition(_, order, frame: SpecifiedWindowFrame) ) if order.isEmpty || !frame.isOffset => windowExpression.failAnalysis( errorClass = "WINDOW_FUNCTION_AND_FRAME_MISMATCH", @@ -131,52 +131,37 @@ object WindowResolution { "windowExpr" -> toSQLExpr(windowExpression) ) ) - case _ @ WindowExpression(windowFunction, windowSpec) => - windowFunction match { - case AggregateExpression(_, _, true, _, _) => - windowExpression.failAnalysis( - errorClass = "DISTINCT_WINDOW_FUNCTION_UNSUPPORTED", - messageParameters = Map("windowExpr" -> toSQLExpr(windowExpression)) - ) - case frameLessOffsetWindowFunction: FrameLessOffsetWindowFunction - if windowSpec.orderSpec.isEmpty || - (windowExpression.windowSpec.frameSpecification.isInstanceOf[SpecifiedWindowFrame] && - !windowExpression.windowSpec.frameSpecification - .asInstanceOf[SpecifiedWindowFrame] - .isOffset) => - frameLessOffsetWindowFunction.failAnalysis( - errorClass = "WINDOW_FUNCTION_AND_FRAME_MISMATCH", - messageParameters = Map( - "funcName" -> toSQLExpr(windowFunction), - "windowExpr" -> toSQLExpr(windowExpression) - ) - ) - case aggregateExpression @ AggregateExpression(fun: ListAgg, _, _, _, _) - if fun.orderingFilled && (windowSpec.orderSpec.nonEmpty || - windowSpec.frameSpecification != - SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing)) => - aggregateExpression.failAnalysis( - errorClass = "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC", - messageParameters = Map("aggFunc" -> toSQLExpr(aggregateExpression.aggregateFunction)) - ) - case aggregateExpression @ AggregateExpression( - _: PercentileCont | _: PercentileDisc | _: Median, _, _, _, _ - ) - if windowSpec.orderSpec.nonEmpty || - windowSpec.frameSpecification != - SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing) => - aggregateExpression.failAnalysis( - errorClass = "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC", - messageParameters = Map("aggFunc" -> toSQLExpr(aggregateExpression.aggregateFunction)) - ) - case _: AggregateExpression | _: FrameLessOffsetWindowFunction | - _: AggregateWindowFunction => - case other => - other.failAnalysis( - errorClass = "UNSUPPORTED_EXPR_FOR_WINDOW", - messageParameters = Map("sqlExpr" -> toSQLExpr(other)) - ) - } + case _ => + } + windowExpression.windowFunction match { + case AggregateExpression(_, _, true, _, _) => + windowExpression.failAnalysis( + errorClass = "DISTINCT_WINDOW_FUNCTION_UNSUPPORTED", + messageParameters = Map("windowExpr" -> toSQLExpr(windowExpression)) + ) + case agg @ AggregateExpression(fun: ListAgg, _, _, _, _) + // listagg(...) WITHIN GROUP (ORDER BY ...) OVER (ORDER BY ...) is unsupported + if fun.orderingFilled && (windowExpression.windowSpec.orderSpec.nonEmpty || + windowExpression.windowSpec.frameSpecification != + SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing)) => + agg.failAnalysis( + errorClass = "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC", + messageParameters = Map("aggFunc" -> toSQLExpr(agg.aggregateFunction)) + ) + case agg @ AggregateExpression(_: PercentileCont | _: PercentileDisc | _: Median, _, _, _, _) + if windowExpression.windowSpec.orderSpec.nonEmpty || + windowExpression.windowSpec.frameSpecification != + SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing) => + agg.failAnalysis( + errorClass = "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC", + messageParameters = Map("aggFunc" -> toSQLExpr(agg.aggregateFunction)) + ) + case _: AggregateExpression | _: FrameLessOffsetWindowFunction | _: AggregateWindowFunction => + case other => + other.failAnalysis( + errorClass = "UNSUPPORTED_EXPR_FOR_WINDOW", + messageParameters = Map("sqlExpr" -> toSQLExpr(other)) + ) } } } From 5489c52cae25bdaf83d7848660eb3ec9be3a9249 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nikola=20Jovi=C4=87evi=C4=87?= Date: Mon, 21 Jul 2025 13:43:42 +0200 Subject: [PATCH 4/4] Separate checks into functions. --- .../spark/sql/catalyst/analysis/WindowResolution.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala index 9b7b529fcf3ee..58b98e7fedf20 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala @@ -119,6 +119,11 @@ object WindowResolution { * - [[AggregateWindowFunction]] */ def validateResolvedWindowExpression(windowExpression: WindowExpression): Unit = { + checkWindowFunctionAndFrameMismatch(windowExpression) + checkWindowFunction(windowExpression) + } + + def checkWindowFunctionAndFrameMismatch(windowExpression: WindowExpression): Unit = { windowExpression match { case _ @ WindowExpression( windowFunction: FrameLessOffsetWindowFunction, @@ -133,6 +138,9 @@ object WindowResolution { ) case _ => } + } + + def checkWindowFunction(windowExpression: WindowExpression): Unit = { windowExpression.windowFunction match { case AggregateExpression(_, _, true, _, _) => windowExpression.failAnalysis(