From bd559401622c711ce82b061d11abb21606df52b4 Mon Sep 17 00:00:00 2001 From: mihailoale-db Date: Tue, 2 Dec 2025 10:31:30 +0100 Subject: [PATCH] initial commit --- .../sql/catalyst/analysis/Analyzer.scala | 4 +- .../analysis/resolver/HybridAnalyzer.scala | 7 +- .../resolver/HybridAnalyzerSuite.scala | 71 +++++++++++-------- 3 files changed, 50 insertions(+), 32 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 08c31939f161..67d25296a1e2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -319,7 +319,7 @@ class Analyzer( AnalysisContext.reset() try { AnalysisHelper.markInAnalyzer { - HybridAnalyzer.fromLegacyAnalyzer(legacyAnalyzer = this).apply(plan, tracker) + HybridAnalyzer.fromLegacyAnalyzer(legacyAnalyzer = this, tracker = tracker).apply(plan) } } finally { AnalysisContext.reset() @@ -327,7 +327,7 @@ class Analyzer( } else { AnalysisContext.withNewAnalysisContext { AnalysisHelper.markInAnalyzer { - HybridAnalyzer.fromLegacyAnalyzer(legacyAnalyzer = this).apply(plan, tracker) + HybridAnalyzer.fromLegacyAnalyzer(legacyAnalyzer = this, tracker = tracker).apply(plan) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HybridAnalyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HybridAnalyzer.scala index d346969be8ef..ecdbef86a297 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HybridAnalyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/HybridAnalyzer.scala @@ -61,6 +61,7 @@ class HybridAnalyzer( legacyAnalyzer: Analyzer, resolverGuard: ResolverGuard, resolver: Resolver, + tracker: QueryPlanningTracker, extendedResolutionChecks: Seq[LogicalPlan => Unit] = Seq.empty, extendedRewriteRules: Seq[Rule[LogicalPlan]] = Seq.empty, exposeExplicitlyUnsupportedResolverFeature: Boolean = false) @@ -74,7 +75,7 @@ class HybridAnalyzer( ) private val sampleRateGenerator = new Random() - def apply(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = { + def apply(plan: LogicalPlan): LogicalPlan = { val dualRun = conf.getConf(SQLConf.ANALYZER_DUAL_RUN_LEGACY_AND_SINGLE_PASS_RESOLVER) && checkDualRunSampleRate() && @@ -296,7 +297,8 @@ object HybridAnalyzer { */ def fromLegacyAnalyzer( legacyAnalyzer: Analyzer, - exposeExplicitlyUnsupportedResolverFeature: Boolean = false): HybridAnalyzer = { + exposeExplicitlyUnsupportedResolverFeature: Boolean = false, + tracker: QueryPlanningTracker): HybridAnalyzer = { new HybridAnalyzer( legacyAnalyzer = legacyAnalyzer, resolverGuard = new ResolverGuard(legacyAnalyzer.catalogManager), @@ -307,6 +309,7 @@ object HybridAnalyzer { metadataResolverExtensions = legacyAnalyzer.singlePassMetadataResolverExtensions, externalRelationResolution = Some(legacyAnalyzer.getRelationResolution) ), + tracker = tracker, extendedResolutionChecks = legacyAnalyzer.singlePassExtendedResolutionChecks, extendedRewriteRules = legacyAnalyzer.singlePassPostHocResolutionRules, exposeExplicitlyUnsupportedResolverFeature = exposeExplicitlyUnsupportedResolverFeature diff --git a/sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/HybridAnalyzerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/HybridAnalyzerSuite.scala index 21d9a72fc2a9..1dacec13f8ac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/HybridAnalyzerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/analysis/resolver/HybridAnalyzerSuite.scala @@ -148,7 +148,8 @@ class HybridAnalyzerSuite extends QueryTest with SharedSparkSession { extends HybridAnalyzer( legacyAnalyzer = legacyAnalyzer, resolverGuard = resolverGuard, - resolver = resolver + resolver = resolver, + tracker = new QueryPlanningTracker ) { override protected[sql] def normalizePlan(plan: LogicalPlan): LogicalPlan = { throw new Exception("Broken plan normalization") @@ -177,8 +178,9 @@ class HybridAnalyzerSuite extends QueryTest with SharedSparkSession { new HybridAnalyzer( new ValidatingAnalyzer(bridgeRelations = true), new ResolverGuard(spark.sessionState.catalogManager), - new ValidatingResolver(bridgeRelations = true) - ).apply(unresolvedPlan, new QueryPlanningTracker), + new ValidatingResolver(bridgeRelations = true), + new QueryPlanningTracker + ).apply(unresolvedPlan), resolvedPlan ) } @@ -192,8 +194,9 @@ class HybridAnalyzerSuite extends QueryTest with SharedSparkSession { new BrokenResolver( QueryCompilationErrors.unsupportedSinglePassAnalyzerFeature("test"), bridgeRelations = true - ) - ).apply(unresolvedPlan, new QueryPlanningTracker) + ), + new QueryPlanningTracker + ).apply(unresolvedPlan) ), condition = "UNSUPPORTED_SINGLE_PASS_ANALYZER_FEATURE", parameters = Map("feature" -> "test") @@ -208,8 +211,9 @@ class HybridAnalyzerSuite extends QueryTest with SharedSparkSession { new BrokenResolver( new StackOverflowError("Stack Overflow"), bridgeRelations = true - ) - ).apply(unresolvedPlan, new QueryPlanningTracker) + ), + new QueryPlanningTracker + ).apply(unresolvedPlan) ) } @@ -219,8 +223,9 @@ class HybridAnalyzerSuite extends QueryTest with SharedSparkSession { new HybridAnalyzer( new ValidatingAnalyzer(bridgeRelations = true), new ResolverGuard(spark.sessionState.catalogManager), - new HardCodedResolver(resolvedPlan, bridgeRelations = true) - ).apply(malformedUnresolvedPlan, new QueryPlanningTracker) + new HardCodedResolver(resolvedPlan, bridgeRelations = true), + new QueryPlanningTracker + ).apply(malformedUnresolvedPlan) ), condition = "HYBRID_ANALYZER_EXCEPTION.FIXED_POINT_FAILED_SINGLE_PASS_SUCCEEDED", parameters = Map("singlePassOutput" -> resolvedPlan.toString) @@ -233,8 +238,9 @@ class HybridAnalyzerSuite extends QueryTest with SharedSparkSession { new HybridAnalyzer( new ValidatingAnalyzer(bridgeRelations = true), new ResolverGuard(spark.sessionState.catalogManager), - new ValidatingResolver(bridgeRelations = true) - ).apply(malformedUnresolvedPlan, new QueryPlanningTracker) + new ValidatingResolver(bridgeRelations = true), + new QueryPlanningTracker + ).apply(malformedUnresolvedPlan) ), condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION", parameters = Map( @@ -250,8 +256,9 @@ class HybridAnalyzerSuite extends QueryTest with SharedSparkSession { new HybridAnalyzer( new ValidatingAnalyzer(bridgeRelations = true), new ResolverGuard(spark.sessionState.catalogManager), - new HardCodedResolver(malformedResolvedPlan, bridgeRelations = true) - ).apply(unresolvedPlan, new QueryPlanningTracker) + new HardCodedResolver(malformedResolvedPlan, bridgeRelations = true), + new QueryPlanningTracker + ).apply(unresolvedPlan) ), condition = "HYBRID_ANALYZER_EXCEPTION.LOGICAL_PLAN_COMPARISON_MISMATCH", parameters = Map( @@ -277,8 +284,9 @@ class HybridAnalyzerSuite extends QueryTest with SharedSparkSession { new HybridAnalyzer( new ValidatingAnalyzer(bridgeRelations = true), new ResolverGuard(spark.sessionState.catalogManager), - new HardCodedResolver(resolvedPlan, bridgeRelations = true) - ).apply(plan, new QueryPlanningTracker) + new HardCodedResolver(resolvedPlan, bridgeRelations = true), + new QueryPlanningTracker + ).apply(plan) ), condition = "HYBRID_ANALYZER_EXCEPTION.OUTPUT_SCHEMA_COMPARISON_MISMATCH", parameters = Map( @@ -294,7 +302,7 @@ class HybridAnalyzerSuite extends QueryTest with SharedSparkSession { new ValidatingAnalyzer(bridgeRelations = true), new ResolverGuard(spark.sessionState.catalogManager), new HardCodedResolver(resolvedPlan, bridgeRelations = true) - ).apply(unresolvedPlan, new QueryPlanningTracker) + ).apply(unresolvedPlan) } } @@ -306,8 +314,9 @@ class HybridAnalyzerSuite extends QueryTest with SharedSparkSession { new BrokenResolver( new ExplicitlyUnsupportedResolverFeature("FAILURE"), bridgeRelations = true - ) - ).apply(unresolvedPlan, new QueryPlanningTracker), + ), + new QueryPlanningTracker + ).apply(unresolvedPlan), resolvedPlan ) } @@ -331,8 +340,9 @@ class HybridAnalyzerSuite extends QueryTest with SharedSparkSession { new BrokenResolver( new Exception("Single-pass resolver should not be invoked"), bridgeRelations = false - ) - ).apply(plan, new QueryPlanningTracker) + ), + new QueryPlanningTracker + ).apply(plan) }, resolvedPlan ) @@ -358,8 +368,9 @@ class HybridAnalyzerSuite extends QueryTest with SharedSparkSession { bridgeRelations = false ), new ResolverGuard(spark.sessionState.catalogManager), - new ValidatingResolver(bridgeRelations = false) - ).apply(plan, new QueryPlanningTracker) + new ValidatingResolver(bridgeRelations = false), + new QueryPlanningTracker + ).apply(plan) }, resolvedPlan ) @@ -387,8 +398,9 @@ class HybridAnalyzerSuite extends QueryTest with SharedSparkSession { bridgeRelations = false ), new ResolverGuard(spark.sessionState.catalogManager), - new ValidatingResolver(bridgeRelations = false) - ).apply(plan, new QueryPlanningTracker) + new ValidatingResolver(bridgeRelations = false), + new QueryPlanningTracker + ).apply(plan) }, resolvedPlan ) @@ -401,8 +413,9 @@ class HybridAnalyzerSuite extends QueryTest with SharedSparkSession { bridgeRelations = true ), new ResolverGuard(spark.sessionState.catalogManager), - new ValidatingResolver(bridgeRelations = true) - ).apply(plan, new QueryPlanningTracker), + new ValidatingResolver(bridgeRelations = true), + new QueryPlanningTracker + ).apply(plan), resolvedPlan ) } @@ -425,8 +438,9 @@ class HybridAnalyzerSuite extends QueryTest with SharedSparkSession { legacyAnalyzer = new ValidatingAnalyzer(bridgeRelations = true), resolverGuard = new ResolverGuard(spark.sessionState.catalogManager), resolver = new ValidatingResolver(bridgeRelations = true), + tracker = new QueryPlanningTracker, extendedResolutionChecks = Seq(new BrokenCheckRule) - ).apply(plan, new QueryPlanningTracker) + ).apply(plan) } withSQLConf( @@ -437,8 +451,9 @@ class HybridAnalyzerSuite extends QueryTest with SharedSparkSession { legacyAnalyzer = new ValidatingAnalyzer(bridgeRelations = true), resolverGuard = new ResolverGuard(spark.sessionState.catalogManager), resolver = new ValidatingResolver(bridgeRelations = true), + tracker = new QueryPlanningTracker, extendedResolutionChecks = Seq(new BrokenCheckRule) - ).apply(plan, new QueryPlanningTracker), + ).apply(plan), resolvedPlan ) }