diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java index 37917bd7649d..e5ae57a76708 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java @@ -27,11 +27,6 @@ @Evolving public interface MergeSummary extends WriteSummary { - /** - * Returns the number of source rows. - */ - long numSourceRows(); - /** * Returns the number of target rows copied unmodified because they did not match any action, * or -1 if not found. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala index f07f47061ee8..911749072c43 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala @@ -21,7 +21,6 @@ package org.apache.spark.sql.connector.write * Implementation of [[MergeSummary]] that provides MERGE operation summary. */ private[sql] case class MergeSummaryImpl( - numSourceRows: Long, numTargetRowsCopied: Long, numTargetRowsDeleted: Long, numTargetRowsUpdated: Long, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 75915d97ba4b..3e4a2f792a1c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -31,11 +31,10 @@ import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, INSER import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog, TableInfo, TableWritePrivilege} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.metric.CustomMetric -import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, MergeSummaryImpl, PhysicalWriteInfoImpl, RowLevelOperationTable, Write, WriterCommitMessage, WriteSummary} +import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, MergeSummaryImpl, PhysicalWriteInfoImpl, Write, WriterCommitMessage, WriteSummary} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SQLExecution, UnaryExecNode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper -import org.apache.spark.sql.execution.joins.BaseJoinExec import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric, SQLMetrics} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.SchemaValidationMode.PROHIBIT_CHANGES @@ -493,9 +492,7 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa private def getWriteSummary(query: SparkPlan): Option[WriteSummary] = { collectFirst(query) { case m: MergeRowsExec => m }.map { n => val metrics = n.metrics - val numSourceRows = getNumSourceRows(n) MergeSummaryImpl( - numSourceRows, metrics.get("numTargetRowsCopied").map(_.value).getOrElse(-1L), metrics.get("numTargetRowsDeleted").map(_.value).getOrElse(-1L), metrics.get("numTargetRowsUpdated").map(_.value).getOrElse(-1L), @@ -507,40 +504,6 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa ) } } - - private def getNumSourceRows(mergeRowsExec: MergeRowsExec): Long = { - def hasTargetTable(plan: SparkPlan): Boolean = { - collectFirst(plan) { - case scan @ BatchScanExec(_, _, _, _, _: RowLevelOperationTable, _) => scan - }.isDefined - } - - def findSourceScan(join: BaseJoinExec): Option[SparkPlan] = { - val leftHasTarget = hasTargetTable(join.left) - val rightHasTarget = hasTargetTable(join.right) - - val sourceSide = if (leftHasTarget) { - Some(join.right) - } else if (rightHasTarget) { - Some(join.left) - } else { - None - } - - sourceSide.flatMap { side => - collectFirst(side) { - case source if source.metrics.contains("numOutputRows") => - source - } - } - } - - (for { - join <- collectFirst(mergeRowsExec.child) { case j: BaseJoinExec => j } - sourceScan <- findSourceScan(join) - metric <- sourceScan.metrics.get("numOutputRows") - } yield metric.value).getOrElse(-1L) - } } trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with Serializable { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala index 7539506e8bfe..a36aeab1295a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala @@ -1779,179 +1779,159 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase } test("Merge metrics with matched clause") { - Seq("true", "false").foreach { aqeEnabled: String => - withTempView("source") { - withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) { - createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", - """{ "pk": 1, "salary": 100, "dep": "hr" } - |{ "pk": 2, "salary": 200, "dep": "software" } - |{ "pk": 3, "salary": 300, "dep": "hr" } - |""".stripMargin) - - val sourceDF = Seq(1, 2, 10).toDF("pk") - sourceDF.createOrReplaceTempView("source") + withTempView("source") { + createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |{ "pk": 3, "salary": 300, "dep": "hr" } + |""".stripMargin) - val mergeExec = findMergeExec { - s"""MERGE INTO $tableNameAsString t - |USING source s - |ON t.pk = s.pk - |WHEN MATCHED AND salary < 200 THEN - | UPDATE SET salary = 1000 - |""".stripMargin - } + val sourceDF = Seq(1, 2, 10).toDF("pk") + sourceDF.createOrReplaceTempView("source") - assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 2) - assertMetric(mergeExec, "numTargetRowsInserted", 0) - assertMetric(mergeExec, "numTargetRowsUpdated", 1) - assertMetric(mergeExec, "numTargetRowsDeleted", 0) - assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1) - assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0) + val mergeExec = findMergeExec { + s"""MERGE INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED AND salary < 200 THEN + | UPDATE SET salary = 1000 + |""".stripMargin + } - checkAnswer( - sql(s"SELECT * FROM $tableNameAsString"), - Seq( - Row(1, 1000, "hr"), // updated - Row(2, 200, "software"), - Row(3, 300, "hr"))) - } + assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 2) + assertMetric(mergeExec, "numTargetRowsInserted", 0) + assertMetric(mergeExec, "numTargetRowsUpdated", 1) + assertMetric(mergeExec, "numTargetRowsDeleted", 0) + assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1) + assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0) - val mergeSummary = getMergeSummary() - assert(mergeSummary.numSourceRows === 3L) - assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 2L)) - assert(mergeSummary.numTargetRowsInserted === 0L) - assert(mergeSummary.numTargetRowsUpdated === 1L) - assert(mergeSummary.numTargetRowsDeleted === 0L) - assert(mergeSummary.numTargetRowsMatchedUpdated === 1L) - assert(mergeSummary.numTargetRowsMatchedDeleted === 0L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L) + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(1, 1000, "hr"), // updated + Row(2, 200, "software"), + Row(3, 300, "hr"))) - sql(s"DROP TABLE $tableNameAsString") - } + val mergeSummary = getMergeSummary() + assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 2L)) + assert(mergeSummary.numTargetRowsInserted === 0L) + assert(mergeSummary.numTargetRowsUpdated === 1L) + assert(mergeSummary.numTargetRowsDeleted === 0L) + assert(mergeSummary.numTargetRowsMatchedUpdated === 1L) + assert(mergeSummary.numTargetRowsMatchedDeleted === 0L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L) } } test("Merge metrics with matched and not matched clause") { - Seq("true", "false").foreach { aqeEnabled: String => - withTempView("source") { - createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", - """{ "pk": 1, "salary": 100, "dep": "hr" } - |{ "pk": 2, "salary": 200, "dep": "software" } - |{ "pk": 3, "salary": 300, "dep": "hr" } - |""".stripMargin) + withTempView("source") { + createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |{ "pk": 3, "salary": 300, "dep": "hr" } + |""".stripMargin) - val sourceDF = Seq( - (4, 100, "marketing"), - (5, 400, "executive"), - (6, 100, "hr") - ).toDF("pk", "salary", "dep") - sourceDF.createOrReplaceTempView("source") + val sourceDF = Seq( + (4, 100, "marketing"), + (5, 400, "executive"), + (6, 100, "hr") + ).toDF("pk", "salary", "dep") + sourceDF.createOrReplaceTempView("source") - val mergeExec = findMergeExec { - s"""MERGE INTO $tableNameAsString t - |USING source s - |ON t.pk = s.pk - |WHEN MATCHED THEN - | UPDATE SET salary = 9999 - |WHEN NOT MATCHED AND salary > 200 THEN - | INSERT * - |""".stripMargin - } + val mergeExec = findMergeExec { + s"""MERGE INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED THEN + | UPDATE SET salary = 9999 + |WHEN NOT MATCHED AND salary > 200 THEN + | INSERT * + |""".stripMargin + } - assertMetric(mergeExec, "numTargetRowsCopied", 0) - assertMetric(mergeExec, "numTargetRowsInserted", 1) - assertMetric(mergeExec, "numTargetRowsUpdated", 0) - assertMetric(mergeExec, "numTargetRowsDeleted", 0) - assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 0) - assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0) + assertMetric(mergeExec, "numTargetRowsCopied", 0) + assertMetric(mergeExec, "numTargetRowsInserted", 1) + assertMetric(mergeExec, "numTargetRowsUpdated", 0) + assertMetric(mergeExec, "numTargetRowsDeleted", 0) + assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 0) + assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0) - checkAnswer( - sql(s"SELECT * FROM $tableNameAsString"), - Seq( - Row(1, 100, "hr"), - Row(2, 200, "software"), - Row(3, 300, "hr"), - Row(5, 400, "executive"))) // inserted - - val mergeSummary = getMergeSummary() - // TODO SPARK-52578: Handle this case when optimizer removes Join due to no matching pks - assert(mergeSummary.numSourceRows === (if (deltaMerge) 3L else -1L)) - assert(mergeSummary.numTargetRowsCopied === 0L) - assert(mergeSummary.numTargetRowsInserted === 1L) - assert(mergeSummary.numTargetRowsUpdated === 0L) - assert(mergeSummary.numTargetRowsDeleted === 0L) - assert(mergeSummary.numTargetRowsMatchedUpdated === 0L) - assert(mergeSummary.numTargetRowsMatchedDeleted === 0L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L) + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(1, 100, "hr"), + Row(2, 200, "software"), + Row(3, 300, "hr"), + Row(5, 400, "executive"))) // inserted - sql(s"DROP TABLE $tableNameAsString") - } + val mergeSummary = getMergeSummary() + assert(mergeSummary.numTargetRowsCopied === 0L) + assert(mergeSummary.numTargetRowsInserted === 1L) + assert(mergeSummary.numTargetRowsUpdated === 0L) + assert(mergeSummary.numTargetRowsDeleted === 0L) + assert(mergeSummary.numTargetRowsMatchedUpdated === 0L) + assert(mergeSummary.numTargetRowsMatchedDeleted === 0L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L) } } test("Merge metrics with matched and not matched by source clauses: update") { - Seq("true", "false").foreach { aqeEnabled: String => - withTempView("source") { - withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) { - createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", - """{ "pk": 1, "salary": 100, "dep": "hr" } - |{ "pk": 2, "salary": 200, "dep": "software" } - |{ "pk": 3, "salary": 300, "dep": "hr" } - |{ "pk": 4, "salary": 400, "dep": "marketing" } - |{ "pk": 5, "salary": 500, "dep": "executive" } - |""".stripMargin) - - val sourceDF = Seq(1, 2, 10).toDF("pk") - sourceDF.createOrReplaceTempView("source") + withTempView("source") { + createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |{ "pk": 3, "salary": 300, "dep": "hr" } + |{ "pk": 4, "salary": 400, "dep": "marketing" } + |{ "pk": 5, "salary": 500, "dep": "executive" } + |""".stripMargin) - val mergeExec = findMergeExec { - s"""MERGE INTO $tableNameAsString t - |USING source s - |ON t.pk = s.pk - |WHEN MATCHED AND salary < 200 THEN - | UPDATE SET salary = 1000 - |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN - | UPDATE SET salary = -1 - |""".stripMargin - } + val sourceDF = Seq(1, 2, 10).toDF("pk") + sourceDF.createOrReplaceTempView("source") - assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3) - assertMetric(mergeExec, "numTargetRowsInserted", 0) - assertMetric(mergeExec, "numTargetRowsUpdated", 2) - assertMetric(mergeExec, "numTargetRowsDeleted", 0) - assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1) - assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 1) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0) + val mergeExec = findMergeExec { + s"""MERGE INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED AND salary < 200 THEN + | UPDATE SET salary = 1000 + |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN + | UPDATE SET salary = -1 + |""".stripMargin + } - checkAnswer( - sql(s"SELECT * FROM $tableNameAsString"), - Seq( - Row(1, 1000, "hr"), // updated - Row(2, 200, "software"), - Row(3, 300, "hr"), - Row(4, 400, "marketing"), - Row(5, -1, "executive"))) // updated - } + assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3) + assertMetric(mergeExec, "numTargetRowsInserted", 0) + assertMetric(mergeExec, "numTargetRowsUpdated", 2) + assertMetric(mergeExec, "numTargetRowsDeleted", 0) + assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1) + assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 1) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0) - val mergeSummary = getMergeSummary() - assert(mergeSummary.numSourceRows === 3L) - assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L)) - assert(mergeSummary.numTargetRowsInserted === 0L) - assert(mergeSummary.numTargetRowsUpdated === 2L) - assert(mergeSummary.numTargetRowsDeleted === 0L) - assert(mergeSummary.numTargetRowsMatchedUpdated === 1L) - assert(mergeSummary.numTargetRowsMatchedDeleted === 0L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 1L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L) + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(1, 1000, "hr"), // updated + Row(2, 200, "software"), + Row(3, 300, "hr"), + Row(4, 400, "marketing"), + Row(5, -1, "executive"))) // updated - sql(s"DROP TABLE $tableNameAsString") - } + val mergeSummary = getMergeSummary() + assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L)) + assert(mergeSummary.numTargetRowsInserted === 0L) + assert(mergeSummary.numTargetRowsUpdated === 2L) + assert(mergeSummary.numTargetRowsDeleted === 0L) + assert(mergeSummary.numTargetRowsMatchedUpdated === 1L) + assert(mergeSummary.numTargetRowsMatchedDeleted === 0L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 1L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L) } } @@ -2000,7 +1980,6 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase ) val mergeSummary = getMergeSummary() - assert(mergeSummary.numSourceRows === 3L) assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L)) assert(mergeSummary.numTargetRowsInserted === 0L) assert(mergeSummary.numTargetRowsUpdated === 0L) @@ -2013,130 +1992,116 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase } test("Merge metrics with matched, not matched, and not matched by source clauses: update") { - Seq("true", "false").foreach { aqeEnabled: String => - withTempView("source") { - withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) { - createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", - """{ "pk": 1, "salary": 100, "dep": "hr" } - |{ "pk": 2, "salary": 200, "dep": "software" } - |{ "pk": 3, "salary": 300, "dep": "hr" } - |{ "pk": 4, "salary": 400, "dep": "marketing" } - |{ "pk": 5, "salary": 500, "dep": "executive" } - |""".stripMargin) - - val sourceDF = Seq(1, 2, 6, 10).toDF("pk") - sourceDF.createOrReplaceTempView("source") + withTempView("source") { + createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |{ "pk": 3, "salary": 300, "dep": "hr" } + |{ "pk": 4, "salary": 400, "dep": "marketing" } + |{ "pk": 5, "salary": 500, "dep": "executive" } + |""".stripMargin) - val mergeExec = findMergeExec { - s"""MERGE INTO $tableNameAsString t - |USING source s - |ON t.pk = s.pk - |WHEN MATCHED AND salary < 200 THEN - | UPDATE SET salary = 1000 - |WHEN NOT MATCHED AND s.pk < 10 THEN - | INSERT (pk, salary, dep) VALUES (s.pk, -1, "dummy") - |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN - | UPDATE SET salary = -1 - |""".stripMargin - } + val sourceDF = Seq(1, 2, 6, 10).toDF("pk") + sourceDF.createOrReplaceTempView("source") - assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3) - assertMetric(mergeExec, "numTargetRowsInserted", 1) - assertMetric(mergeExec, "numTargetRowsUpdated", 2) - assertMetric(mergeExec, "numTargetRowsDeleted", 0) - assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1) - assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 1) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0) + val mergeExec = findMergeExec { + s"""MERGE INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED AND salary < 200 THEN + | UPDATE SET salary = 1000 + |WHEN NOT MATCHED AND s.pk < 10 THEN + | INSERT (pk, salary, dep) VALUES (s.pk, -1, "dummy") + |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN + | UPDATE SET salary = -1 + |""".stripMargin + } - checkAnswer( - sql(s"SELECT * FROM $tableNameAsString"), - Seq( - Row(1, 1000, "hr"), // updated - Row(2, 200, "software"), - Row(3, 300, "hr"), - Row(4, 400, "marketing"), - Row(5, -1, "executive"), // updated - Row(6, -1, "dummy"))) // inserted - } + assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3) + assertMetric(mergeExec, "numTargetRowsInserted", 1) + assertMetric(mergeExec, "numTargetRowsUpdated", 2) + assertMetric(mergeExec, "numTargetRowsDeleted", 0) + assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 1) + assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 0) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 1) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 0) - val mergeSummary = getMergeSummary() - assert(mergeSummary.numSourceRows === 4L) - assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L)) - assert(mergeSummary.numTargetRowsInserted === 1L) - assert(mergeSummary.numTargetRowsUpdated === 2L) - assert(mergeSummary.numTargetRowsDeleted === 0L) - assert(mergeSummary.numTargetRowsMatchedUpdated === 1L) - assert(mergeSummary.numTargetRowsMatchedDeleted === 0L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 1L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L) + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(1, 1000, "hr"), // updated + Row(2, 200, "software"), + Row(3, 300, "hr"), + Row(4, 400, "marketing"), + Row(5, -1, "executive"), // updated + Row(6, -1, "dummy"))) // inserted - sql(s"DROP TABLE $tableNameAsString") - } + val mergeSummary = getMergeSummary() + assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L)) + assert(mergeSummary.numTargetRowsInserted === 1L) + assert(mergeSummary.numTargetRowsUpdated === 2L) + assert(mergeSummary.numTargetRowsDeleted === 0L) + assert(mergeSummary.numTargetRowsMatchedUpdated === 1L) + assert(mergeSummary.numTargetRowsMatchedDeleted === 0L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 1L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L) } } test("Merge metrics with matched, not matched, and not matched by source clauses: delete") { - Seq("true", "false").foreach { aqeEnabled: String => - withTempView("source") { - withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) { - createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", - """{ "pk": 1, "salary": 100, "dep": "hr" } - |{ "pk": 2, "salary": 200, "dep": "software" } - |{ "pk": 3, "salary": 300, "dep": "hr" } - |{ "pk": 4, "salary": 400, "dep": "marketing" } - |{ "pk": 5, "salary": 500, "dep": "executive" } - |""".stripMargin) - - val sourceDF = Seq(1, 2, 6, 10).toDF("pk") - sourceDF.createOrReplaceTempView("source") + withTempView("source") { + createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |{ "pk": 3, "salary": 300, "dep": "hr" } + |{ "pk": 4, "salary": 400, "dep": "marketing" } + |{ "pk": 5, "salary": 500, "dep": "executive" } + |""".stripMargin) - val mergeExec = findMergeExec { - s"""MERGE INTO $tableNameAsString t - |USING source s - |ON t.pk = s.pk - |WHEN MATCHED AND salary < 200 THEN - | DELETE - |WHEN NOT MATCHED AND s.pk < 10 THEN - | INSERT (pk, salary, dep) VALUES (s.pk, -1, "dummy") - |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN - | DELETE - |""".stripMargin - } + val sourceDF = Seq(1, 2, 6, 10).toDF("pk") + sourceDF.createOrReplaceTempView("source") - assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3) - assertMetric(mergeExec, "numTargetRowsInserted", 1) - assertMetric(mergeExec, "numTargetRowsUpdated", 0) - assertMetric(mergeExec, "numTargetRowsDeleted", 2) - assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 0) - assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 1) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0) - assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 1) + val mergeExec = findMergeExec { + s"""MERGE INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED AND salary < 200 THEN + | DELETE + |WHEN NOT MATCHED AND s.pk < 10 THEN + | INSERT (pk, salary, dep) VALUES (s.pk, -1, "dummy") + |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN + | DELETE + |""".stripMargin + } - checkAnswer( - sql(s"SELECT * FROM $tableNameAsString"), - Seq( - // Row(1, 100, "hr") deleted - Row(2, 200, "software"), - Row(3, 300, "hr"), - Row(4, 400, "marketing"), - // Row(5, 500, "executive") deleted - Row(6, -1, "dummy"))) // inserted - } + assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3) + assertMetric(mergeExec, "numTargetRowsInserted", 1) + assertMetric(mergeExec, "numTargetRowsUpdated", 0) + assertMetric(mergeExec, "numTargetRowsDeleted", 2) + assertMetric(mergeExec, "numTargetRowsMatchedUpdated", 0) + assertMetric(mergeExec, "numTargetRowsMatchedDeleted", 1) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0) + assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 1) - val mergeSummary = getMergeSummary() - assert(mergeSummary.numSourceRows === 4L) - assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L)) - assert(mergeSummary.numTargetRowsInserted === 1L) - assert(mergeSummary.numTargetRowsUpdated === 0L) - assert(mergeSummary.numTargetRowsDeleted === 2L) - assert(mergeSummary.numTargetRowsMatchedUpdated === 0L) - assert(mergeSummary.numTargetRowsMatchedDeleted === 1L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 1L) + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + // Row(1, 100, "hr") deleted + Row(2, 200, "software"), + Row(3, 300, "hr"), + Row(4, 400, "marketing"), + // Row(5, 500, "executive") deleted + Row(6, -1, "dummy"))) // inserted - sql(s"DROP TABLE $tableNameAsString") - } + val mergeSummary = getMergeSummary() + assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L)) + assert(mergeSummary.numTargetRowsInserted === 1L) + assert(mergeSummary.numTargetRowsUpdated === 0L) + assert(mergeSummary.numTargetRowsDeleted === 2L) + assert(mergeSummary.numTargetRowsMatchedUpdated === 0L) + assert(mergeSummary.numTargetRowsMatchedDeleted === 1L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L) + assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 1L) } } @@ -2169,7 +2134,6 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase ) val mergeMetrics = getMergeSummary() - assert(mergeMetrics.numSourceRows === 4L) assert(mergeMetrics.numTargetRowsCopied === (if (deltaMerge) 0L else 3L)) assert(mergeMetrics.numTargetRowsInserted === 1L) assert(mergeMetrics.numTargetRowsUpdated === 0L) @@ -2185,46 +2149,6 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase } } - test("Merge metrics with numSourceRows for empty source") { - Seq("true", "false").foreach { aqeEnabled: String => - withTempView("source") { - withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) { - createAndInitTable( - "pk INT NOT NULL, salary INT, dep STRING", - """{ "pk": 1, "salary": 100, "dep": "hr" } - |{ "pk": 2, "salary": 200, "dep": "software" } - |{ "pk": 3, "salary": 300, "dep": "hr" } - |""".stripMargin) - - // source is empty - Seq.empty[Int].toDF("pk").createOrReplaceTempView("source") - - sql(s"""MERGE INTO $tableNameAsString t - |USING source s - |ON t.pk = s.pk - |WHEN MATCHED THEN - | UPDATE SET salary = 1000 - |WHEN NOT MATCHED BY SOURCE THEN - | DELETE - |""".stripMargin) - - val mergeSummary = getMergeSummary() - assert(mergeSummary.numSourceRows === -1L) // if no numOutputRows, should be -1 - assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 0L)) - assert(mergeSummary.numTargetRowsInserted === 0L) - assert(mergeSummary.numTargetRowsUpdated === 0L) - assert(mergeSummary.numTargetRowsDeleted === 3L) - assert(mergeSummary.numTargetRowsMatchedUpdated === 0L) - assert(mergeSummary.numTargetRowsMatchedDeleted === 0L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L) - assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 3L) - - sql(s"DROP TABLE $tableNameAsString") - } - } - } - } - test("Merge schema evolution new column with set explicit column") { Seq((true, true), (false, true), (true, false)).foreach { case (withSchemaEvolution, schemaEvolutionEnabled) =>