Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -459,16 +459,21 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_EXPLAIN_VERBOSE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.explain.verbose.enabled")
val COMET_EXTENDED_EXPLAIN_FORMAT_VERBOSE = "verbose"
val COMET_EXTENDED_EXPLAIN_FORMAT_FALLBACK = "fallback"

val COMET_EXTENDED_EXPLAIN_FORMAT: ConfigEntry[String] =
conf("spark.comet.explain.format")
.category(CATEGORY_EXEC_EXPLAIN)
.doc(
"When this setting is enabled, Comet's extended explain output will provide the full " +
"query plan annotated with fallback reasons as well as a summary of how much of " +
"the plan was accelerated by Comet. When this setting is disabled, a list of fallback " +
"reasons will be provided instead.")
.booleanConf
.createWithDefault(false)
.doc("Choose extended explain output. The default format of " +
s"'$COMET_EXTENDED_EXPLAIN_FORMAT_VERBOSE' will provide the full query plan annotated " +
"with fallback reasons as well as a summary of how much of the plan was accelerated " +
s"by Comet. The format '$COMET_EXTENDED_EXPLAIN_FORMAT_FALLBACK' provides a list of " +
"fallback reasons instead.")
.stringConf
.checkValues(
Set(COMET_EXTENDED_EXPLAIN_FORMAT_VERBOSE, COMET_EXTENDED_EXPLAIN_FORMAT_FALLBACK))
.createWithDefault(COMET_EXTENDED_EXPLAIN_FORMAT_VERBOSE)

val COMET_EXPLAIN_NATIVE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.explain.native.enabled")
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/latest/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ These settings can be used to determine which parts of the plan are accelerated
<!--BEGIN:CONFIG_TABLE[exec_explain]-->
| Config | Description | Default Value |
|--------|-------------|---------------|
| `spark.comet.explain.format` | Choose extended explain output. The default format of 'verbose' will provide the full query plan annotated with fallback reasons as well as a summary of how much of the plan was accelerated by Comet. The format 'fallback' provides a list of fallback reasons instead. | verbose |
| `spark.comet.explain.native.enabled` | When this setting is enabled, Comet will provide a tree representation of the native query plan before execution and again after execution, with metrics. | false |
| `spark.comet.explain.rules` | When this setting is enabled, Comet will log all plan transformations performed in physical optimizer rules. Default: false | false |
| `spark.comet.explain.verbose.enabled` | When this setting is enabled, Comet's extended explain output will provide the full query plan annotated with fallback reasons as well as a summary of how much of the plan was accelerated by Comet. When this setting is disabled, a list of fallback reasons will be provided instead. | false |
| `spark.comet.explainFallback.enabled` | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false |
| `spark.comet.logFallbackReasons.enabled` | When this setting is enabled, Comet will log warnings for all fallback reasons. | false |
<!--END:CONFIG_TABLE-->
Expand Down
41 changes: 17 additions & 24 deletions spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,26 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator {

override def title: String = "Comet"

override def generateExtendedInfo(plan: SparkPlan): String = {
if (CometConf.COMET_EXPLAIN_VERBOSE_ENABLED.get()) {
generateVerboseExtendedInfo(plan)
} else {
val info = getFallbackReasons(plan)
info.toSeq.sorted.mkString("\n").trim
def generateExtendedInfo(plan: SparkPlan): String = {
CometConf.COMET_EXTENDED_EXPLAIN_FORMAT.get() match {
case CometConf.COMET_EXTENDED_EXPLAIN_FORMAT_VERBOSE =>
// Generates the extended info in a verbose manner, printing each node along with the
// extended information in a tree display.
val planStats = new CometCoverageStats()
val outString = new StringBuilder()
generateTreeString(getActualPlan(plan), 0, Seq(), 0, outString, planStats)
s"${outString.toString()}\n$planStats"
case CometConf.COMET_EXTENDED_EXPLAIN_FORMAT_FALLBACK =>
// Generates the extended info as a list of fallback reasons
getFallbackReasons(plan).mkString("\n").trim
}
}

def getFallbackReasons(node: TreeNode[_]): Set[String] = {
def getFallbackReasons(plan: SparkPlan): Seq[String] = {
extensionInfo(plan).toSeq.sorted
}

private[comet] def extensionInfo(node: TreeNode[_]): Set[String] = {
var info = mutable.Seq[String]()
val sorted = sortup(node)
sorted.foreach { p =>
Expand Down Expand Up @@ -80,23 +90,6 @@ class ExtendedExplainInfo extends ExtendedExplainGenerator {
ordered.reverse
}

// generates the extended info in a verbose manner, printing each node along with the
// extended information in a tree display
def generateVerboseExtendedInfo(plan: SparkPlan): String = {
val planStats = new CometCoverageStats()
val outString = new StringBuilder()
generateTreeString(getActualPlan(plan), 0, Seq(), 0, outString, planStats)
s"${outString.toString()}\n$planStats"
}

/** Get the coverage statistics without the full plan */
def generateCoverageInfo(plan: SparkPlan): String = {
val planStats = new CometCoverageStats()
val outString = new StringBuilder()
generateTreeString(getActualPlan(plan), 0, Seq(), 0, outString, planStats)
planStats.toString()
}

// Simplified generateTreeString from Spark TreeNode. Appends explain info to the node if any
def generateTreeString(
node: TreeNode[_],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,12 +651,12 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
// config is enabled)
if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get()) {
val info = new ExtendedExplainInfo()
if (info.getFallbackReasons(newPlan).nonEmpty) {
if (info.extensionInfo(newPlan).nonEmpty) {
logWarning(
"Comet cannot execute some parts of this plan natively " +
s"(set ${CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key}=false " +
"to disable this logging):\n" +
s"${info.generateVerboseExtendedInfo(newPlan)}")
s"${info.generateExtendedInfo(newPlan)}")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.comet

import scala.collection.immutable.HashSet
import scala.util.Random

import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -126,11 +125,11 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
spark.read.parquet(path.toString).createOrReplaceTempView("t1")
sql("SELECT array(struct(_1, _2)) as a, struct(_1, _2) as b FROM t1")
.createOrReplaceTempView("t2")
val expectedFallbackReasons = HashSet(
"data type not supported: ArrayType(StructType(StructField(_1,BooleanType,true),StructField(_2,ByteType,true)),false)")
checkSparkAnswerAndFallbackReasons(
val expectedFallbackReason =
"data type not supported: ArrayType(StructType(StructField(_1,BooleanType,true),StructField(_2,ByteType,true)),false)"
checkSparkAnswerAndFallbackReason(
sql("SELECT array_remove(a, b) FROM t2"),
expectedFallbackReasons)
expectedFallbackReason)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,7 @@ class CometExecSuite extends CometTestBase {
val infos = new ExtendedExplainInfo().generateExtendedInfo(cometPlan)
assert(infos.contains("Dynamic Partition Pruning is not supported"))

withSQLConf(CometConf.COMET_EXPLAIN_VERBOSE_ENABLED.key -> "true") {
val extendedExplain = new ExtendedExplainInfo().generateExtendedInfo(cometPlan)
assert(extendedExplain.contains("Comet accelerated"))
}
assert(infos.contains("Comet accelerated"))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ trait CometTPCQueryListBase
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_EXPLAIN_VERBOSE_ENABLED.key -> "true",
// Lower bloom filter thresholds to allows us to simulate the plan produced at larger scale.
"spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold" -> "1MB",
"spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold" -> "1MB") {
Expand Down
4 changes: 2 additions & 2 deletions spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ abstract class CometTestBase
if (actualFallbacks.isEmpty) {
fail(
s"Expected fallback reason '$reason' but no fallback reasons were found. Explain: ${explainInfo
.generateVerboseExtendedInfo(cometPlan)}")
.generateExtendedInfo(cometPlan)}")
} else {
fail(
s"Expected fallback reason '$reason' not found in [${actualFallbacks.mkString(", ")}]")
Expand Down Expand Up @@ -375,7 +375,7 @@ abstract class CometTestBase
assert(
false,
s"Expected only Comet native operators, but found ${op.nodeName}.\n" +
s"plan: ${new ExtendedExplainInfo().generateVerboseExtendedInfo(plan)}")
s"plan: ${new ExtendedExplainInfo().generateExtendedInfo(plan)}")
case _ =>
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ trait CometPlanStabilitySuite extends DisableAdaptiveExecutionSuite with TPCDSBa

withSQLConf(
CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true",
CometConf.COMET_EXPLAIN_VERBOSE_ENABLED.key -> "true",
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
Expand Down
Loading