Skip to content

Commit 5293ea6

Browse files
committed
single pass
1 parent 4cc8101 commit 5293ea6

File tree

61 files changed

+1856
-1474
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+1856
-1474
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,7 @@ import org.apache.spark.sql.catalyst._
3131
import org.apache.spark.sql.catalyst.analysis.resolver.{
3232
AnalyzerBridgeState,
3333
HybridAnalyzer,
34-
Resolver => OperatorResolver,
35-
ResolverExtension,
36-
ResolverGuard
34+
ResolverExtension
3735
}
3836
import org.apache.spark.sql.catalyst.catalog._
3937
import org.apache.spark.sql.catalyst.encoders.OuterScopes
@@ -297,17 +295,17 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
297295
def getRelationResolution: RelationResolution = relationResolution
298296

299297
def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = {
300-
if (plan.analyzed) return plan
301-
AnalysisHelper.markInAnalyzer {
302-
new HybridAnalyzer(
303-
this,
304-
new ResolverGuard(catalogManager),
305-
new OperatorResolver(
306-
catalogManager,
307-
singlePassResolverExtensions,
308-
singlePassMetadataResolverExtensions
309-
)
310-
).apply(plan, tracker)
298+
if (plan.analyzed) {
299+
plan
300+
} else {
301+
AnalysisContext.reset()
302+
try {
303+
AnalysisHelper.markInAnalyzer {
304+
HybridAnalyzer.fromLegacyAnalyzer(legacyAnalyzer = this).apply(plan, tracker)
305+
}
306+
} finally {
307+
AnalysisContext.reset()
308+
}
311309
}
312310
}
313311

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AggregateExpressionResolver.scala

Lines changed: 7 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,6 @@
1818
package org.apache.spark.sql.catalyst.analysis.resolver
1919

2020
import org.apache.spark.sql.AnalysisException
21-
import org.apache.spark.sql.catalyst.analysis.{
22-
AnsiTypeCoercion,
23-
CollationTypeCoercion,
24-
TypeCoercion
25-
}
2621
import org.apache.spark.sql.catalyst.expressions.{Expression, OuterReference, SubExprUtils}
2722
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, ListAgg}
2823
import org.apache.spark.sql.catalyst.util.toPrettySQL
@@ -41,11 +36,6 @@ class AggregateExpressionResolver(
4136

4237
private val traversals = expressionResolver.getExpressionTreeTraversals
4338

44-
protected override val ansiTransformations: CoercesExpressionTypes.Transformations =
45-
AggregateExpressionResolver.ANSI_TYPE_COERCION_TRANSFORMATIONS
46-
protected override val nonAnsiTransformations: CoercesExpressionTypes.Transformations =
47-
AggregateExpressionResolver.TYPE_COERCION_TRANSFORMATIONS
48-
4939
private val expressionResolutionContextStack =
5040
expressionResolver.getExpressionResolutionContextStack
5141
private val subqueryRegistry = operatorResolver.getSubqueryRegistry
@@ -58,6 +48,7 @@ class AggregateExpressionResolver(
5848
* resolving its children recursively and validating the resolved expression.
5949
*/
6050
override def resolve(aggregateExpression: AggregateExpression): Expression = {
51+
expressionResolutionContextStack.peek().resolvingTreeUnderAggregateExpression = true
6152
val aggregateExpressionWithChildrenResolved =
6253
withResolvedChildren(aggregateExpression, expressionResolver.resolve _)
6354
.asInstanceOf[AggregateExpression]
@@ -132,15 +123,13 @@ class AggregateExpressionResolver(
132123
throwNestedAggregateFunction(aggregateExpression)
133124
}
134125

135-
val nonDeterministicChild =
136-
aggregateExpression.aggregateFunction.children.collectFirst {
137-
case child if !child.deterministic => child
126+
aggregateExpression.aggregateFunction.children.foreach { child =>
127+
if (!child.deterministic) {
128+
throwAggregateFunctionWithNondeterministicExpression(
129+
aggregateExpression,
130+
child
131+
)
138132
}
139-
if (nonDeterministicChild.nonEmpty) {
140-
throwAggregateFunctionWithNondeterministicExpression(
141-
aggregateExpression,
142-
nonDeterministicChild.get
143-
)
144133
}
145134
}
146135

@@ -249,23 +238,3 @@ class AggregateExpressionResolver(
249238
)
250239
}
251240
}
252-
253-
object AggregateExpressionResolver {
254-
// Ordering in the list of type coercions should be in sync with the list in [[TypeCoercion]].
255-
private val TYPE_COERCION_TRANSFORMATIONS: Seq[Expression => Expression] = Seq(
256-
CollationTypeCoercion.apply,
257-
TypeCoercion.InTypeCoercion.apply,
258-
TypeCoercion.FunctionArgumentTypeCoercion.apply,
259-
TypeCoercion.IfTypeCoercion.apply,
260-
TypeCoercion.ImplicitTypeCoercion.apply
261-
)
262-
263-
// Ordering in the list of type coercions should be in sync with the list in [[AnsiTypeCoercion]].
264-
private val ANSI_TYPE_COERCION_TRANSFORMATIONS: Seq[Expression => Expression] = Seq(
265-
CollationTypeCoercion.apply,
266-
AnsiTypeCoercion.InTypeCoercion.apply,
267-
AnsiTypeCoercion.FunctionArgumentTypeCoercion.apply,
268-
AnsiTypeCoercion.IfTypeCoercion.apply,
269-
AnsiTypeCoercion.ImplicitTypeCoercion.apply
270-
)
271-
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AggregateResolutionResult.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@ package org.apache.spark.sql.catalyst.analysis.resolver
2020
import java.util.HashSet
2121

2222
import org.apache.spark.sql.catalyst.expressions.{Alias, ExprId, NamedExpression}
23-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
23+
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
2424

2525
/**
26-
* Stores the resulting operator, output list, grouping attributes and list of aliases from
27-
* aggregate list, obtained by resolving an [[Aggregate]] operator.
26+
* Stores the resulting operator, output list, grouping attributes, list of aliases from
27+
* aggregate list and base [[Aggregate]], obtained by resolving an [[Aggregate]] operator.
2828
*/
2929
case class AggregateResolutionResult(
3030
operator: LogicalPlan,
3131
outputList: Seq[NamedExpression],
32-
groupingAttributeIds: Option[HashSet[ExprId]],
33-
aggregateListAliases: Seq[Alias])
32+
groupingAttributeIds: HashSet[ExprId],
33+
aggregateListAliases: Seq[Alias],
34+
baseAggregate: Aggregate)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AggregateResolver.scala

Lines changed: 41 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -17,50 +17,55 @@
1717

1818
package org.apache.spark.sql.catalyst.analysis.resolver
1919

20-
import java.util.{HashSet, LinkedHashMap}
20+
import java.util.HashSet
2121

22-
import scala.jdk.CollectionConverters._
23-
24-
import org.apache.spark.sql.catalyst.analysis.{
25-
AnalysisErrorAt,
26-
NondeterministicExpressionCollection,
27-
UnresolvedAttribute
28-
}
22+
import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt, UnresolvedAttribute}
2923
import org.apache.spark.sql.catalyst.expressions.{
3024
Alias,
25+
AliasHelper,
3126
AttributeReference,
3227
Expression,
3328
ExprId,
34-
ExprUtils,
35-
NamedExpression
29+
ExprUtils
3630
}
37-
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project}
31+
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
3832

3933
/**
4034
* Resolves an [[Aggregate]] by resolving its child, aggregate expressions and grouping
4135
* expressions. Updates the [[NameScopeStack]] with its output and performs validation
4236
* related to [[Aggregate]] resolution.
4337
*/
4438
class AggregateResolver(operatorResolver: Resolver, expressionResolver: ExpressionResolver)
45-
extends TreeNodeResolver[Aggregate, LogicalPlan] {
39+
extends TreeNodeResolver[Aggregate, LogicalPlan]
40+
with AliasHelper {
4641
private val scopes = operatorResolver.getNameScopes
4742
private val lcaResolver = expressionResolver.getLcaResolver
4843

4944
/**
5045
* Resolve [[Aggregate]] operator.
5146
*
5247
* 1. Resolve the child (inline table).
53-
* 2. Resolve aggregate expressions using [[ExpressionResolver.resolveAggregateExpressions]] and
48+
* 2. Clear [[NameScope.availableAliases]]. Those are only relevant for the immediate aggregate
49+
* expressions for output prioritization to work correctly in
50+
* [[NameScope.tryResolveMultipartNameByOutput]].
51+
* 3. Resolve aggregate expressions using [[ExpressionResolver.resolveAggregateExpressions]] and
5452
* set [[NameScope.ordinalReplacementExpressions]] for grouping expressions resolution.
55-
* 3. If there's just one [[UnresolvedAttribute]] with a single-part name "ALL", expand it using
53+
* 4. If there's just one [[UnresolvedAttribute]] with a single-part name "ALL", expand it using
5654
* aggregate expressions which don't contain aggregate functions. There should not exist a
5755
* column with that name in the lower operator's output, otherwise it takes precedence.
58-
* 4. Resolve grouping expressions using [[ExpressionResolver.resolveGroupingExpressions]]. This
56+
* 5. Resolve grouping expressions using [[ExpressionResolver.resolveGroupingExpressions]]. This
5957
* includes alias references to aggregate expressions, which is done in
6058
* [[NameScope.resolveMultipartName]] and replacing [[UnresolvedOrdinals]] with corresponding
6159
* expressions from aggregate list, done in [[OrdinalResolver]].
62-
* 5. Substitute non-deterministic expressions with derived attribute references to an
63-
* artificial [[Project]] list.
60+
* 6. Remove all the unnecessary [[Alias]]es from the grouping (all the aliases) and aggregate
61+
* (keep the outermost one) expressions. This is needed to stay compatible with the
62+
* fixed-point implementation. For example:
63+
*
64+
* {{{ SELECT timestamp(col1:str) FROM VALUES('a') GROUP BY timestamp(col1:str); }}}
65+
*
66+
* Here we end up having inner [[Alias]]es in both the grouping and aggregate expressions
67+
* lists which are uncomparable because they have different expression IDs (thus we have to
68+
* strip them).
6469
*
6570
* If the resulting [[Aggregate]] contains lateral columns references, delegate the resolution of
6671
* these columns to [[LateralColumnAliasResolver.handleLcaInAggregate]]. Otherwise, validate the
@@ -73,6 +78,8 @@ class AggregateResolver(operatorResolver: Resolver, expressionResolver: Expressi
7378
val resolvedAggregate = try {
7479
val resolvedChild = operatorResolver.resolve(unresolvedAggregate.child)
7580

81+
scopes.current.availableAliases.clear()
82+
7683
val resolvedAggregateExpressions = expressionResolver.resolveAggregateExpressions(
7784
unresolvedAggregate.aggregateExpressions,
7885
unresolvedAggregate
@@ -100,21 +107,25 @@ class AggregateResolver(operatorResolver: Resolver, expressionResolver: Expressi
100107
)
101108
}
102109

103-
val partiallyResolvedAggregate = unresolvedAggregate.copy(
104-
groupingExpressions = resolvedGroupingExpressions,
105-
aggregateExpressions = resolvedAggregateExpressions.expressions,
110+
val resolvedGroupingExpressionsWithoutAliases = resolvedGroupingExpressions.map(trimAliases)
111+
val resolvedAggregateExpressionsWithoutAliases =
112+
resolvedAggregateExpressions.expressions.map(trimNonTopLevelAliases)
113+
114+
val resolvedAggregate = unresolvedAggregate.copy(
115+
groupingExpressions = resolvedGroupingExpressionsWithoutAliases,
116+
aggregateExpressions = resolvedAggregateExpressionsWithoutAliases,
106117
child = resolvedChild
107118
)
108119

109-
val resolvedAggregate = tryPullOutNondeterministic(partiallyResolvedAggregate)
110-
111120
if (resolvedAggregateExpressions.hasLateralColumnAlias) {
112121
val aggregateWithLcaResolutionResult = lcaResolver.handleLcaInAggregate(resolvedAggregate)
113122
AggregateResolutionResult(
114123
operator = aggregateWithLcaResolutionResult.resolvedOperator,
115124
outputList = aggregateWithLcaResolutionResult.outputList,
116-
groupingAttributeIds = None,
117-
aggregateListAliases = aggregateWithLcaResolutionResult.aggregateListAliases
125+
groupingAttributeIds =
126+
getGroupingAttributeIds(aggregateWithLcaResolutionResult.baseAggregate),
127+
aggregateListAliases = aggregateWithLcaResolutionResult.aggregateListAliases,
128+
baseAggregate = aggregateWithLcaResolutionResult.baseAggregate
118129
)
119130
} else {
120131
// TODO: This validation function does a post-traversal. This is discouraged in single-pass
@@ -124,8 +135,9 @@ class AggregateResolver(operatorResolver: Resolver, expressionResolver: Expressi
124135
AggregateResolutionResult(
125136
operator = resolvedAggregate,
126137
outputList = resolvedAggregate.aggregateExpressions,
127-
groupingAttributeIds = Some(getGroupingAttributeIds(resolvedAggregate)),
128-
aggregateListAliases = scopes.current.getTopAggregateExpressionAliases
138+
groupingAttributeIds = getGroupingAttributeIds(resolvedAggregate),
139+
aggregateListAliases = scopes.current.getTopAggregateExpressionAliases,
140+
baseAggregate = resolvedAggregate
129141
)
130142
}
131143
} finally {
@@ -134,8 +146,9 @@ class AggregateResolver(operatorResolver: Resolver, expressionResolver: Expressi
134146

135147
scopes.overwriteOutputAndExtendHiddenOutput(
136148
output = resolvedAggregate.outputList.map(_.toAttribute),
137-
groupingAttributeIds = resolvedAggregate.groupingAttributeIds,
138-
aggregateListAliases = resolvedAggregate.aggregateListAliases
149+
groupingAttributeIds = Some(resolvedAggregate.groupingAttributeIds),
150+
aggregateListAliases = resolvedAggregate.aggregateListAliases,
151+
baseAggregate = Some(resolvedAggregate.baseAggregate)
139152
)
140153

141154
resolvedAggregate.operator
@@ -208,53 +221,6 @@ class AggregateResolver(operatorResolver: Resolver, expressionResolver: Expressi
208221
}
209222
}
210223

211-
/**
212-
* In case there are non-deterministic expressions in either `groupingExpressions` or
213-
* `aggregateExpressions` replace them with attributes created out of corresponding
214-
* non-deterministic expression. Example:
215-
*
216-
* {{{ SELECT RAND() GROUP BY 1; }}}
217-
*
218-
* This query would have the following analyzed plan:
219-
* Aggregate(
220-
* groupingExpressions = [AttributeReference(_nonDeterministic)]
221-
* aggregateExpressions = [Alias(AttributeReference(_nonDeterministic), `rand()`)]
222-
* child = Project(
223-
* projectList = [Alias(Rand(...), `_nondeterministic`)]
224-
* child = OneRowRelation
225-
* )
226-
* )
227-
*/
228-
private def tryPullOutNondeterministic(aggregate: Aggregate): Aggregate = {
229-
val nondeterministicToAttributes: LinkedHashMap[Expression, NamedExpression] =
230-
NondeterministicExpressionCollection.getNondeterministicToAttributes(
231-
aggregate.groupingExpressions
232-
)
233-
234-
if (!nondeterministicToAttributes.isEmpty) {
235-
val newChild = Project(
236-
scopes.current.output ++ nondeterministicToAttributes.values.asScala.toSeq,
237-
aggregate.child
238-
)
239-
val resolvedAggregateExpressions = aggregate.aggregateExpressions.map { expression =>
240-
PullOutNondeterministicExpressionInExpressionTree(expression, nondeterministicToAttributes)
241-
}
242-
val resolvedGroupingExpressions = aggregate.groupingExpressions.map { expression =>
243-
PullOutNondeterministicExpressionInExpressionTree(
244-
expression,
245-
nondeterministicToAttributes
246-
)
247-
}
248-
aggregate.copy(
249-
groupingExpressions = resolvedGroupingExpressions,
250-
aggregateExpressions = resolvedAggregateExpressions,
251-
child = newChild
252-
)
253-
} else {
254-
aggregate
255-
}
256-
}
257-
258224
private def canGroupByAll(expressions: Seq[Expression]): Boolean = {
259225
val isOrderByAll = expressions match {
260226
case Seq(unresolvedAttribute: UnresolvedAttribute) =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AggregateWithLcaResolutionResult.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,19 @@
1818
package org.apache.spark.sql.catalyst.analysis.resolver
1919

2020
import org.apache.spark.sql.catalyst.expressions.{Alias, NamedExpression}
21-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
21+
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
2222

2323
/**
2424
* Stores the result of resolution of lateral column aliases in an [[Aggregate]].
2525
* @param resolvedOperator The resolved operator.
2626
* @param outputList The output list of the resolved operator.
2727
* @param aggregateListAliases List of aliases from aggregate list and all artificially inserted
2828
* [[Project]] nodes.
29+
* @param baseAggregate [[Aggregate]] node constructed by [[LateralColumnAliasResolver]] while
30+
* resolving lateral column references in [[Aggregate]].
2931
*/
3032
case class AggregateWithLcaResolutionResult(
3133
resolvedOperator: LogicalPlan,
3234
outputList: Seq[NamedExpression],
33-
aggregateListAliases: Seq[Alias])
35+
aggregateListAliases: Seq[Alias],
36+
baseAggregate: Aggregate)

0 commit comments

Comments
 (0)