diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala index 680fa63e0929..e99e8c5b2d57 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala @@ -5235,6 +5235,54 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase sql(s"DROP TABLE IF EXISTS $tableNameAsString") } + test("merge with struct missing nested field with check constraint") { + withTempView("source") { + // Target table has struct with nested field c2 + createAndInitTable( + s"""pk INT NOT NULL, + |s STRUCT, + |dep STRING""".stripMargin, + """{ "pk": 0, "s": { "c1": 1, "c2": 10 }, "dep": "sales" } + |{ "pk": 1, "s": { "c1": 2, "c2": 20 }, "dep": "hr" }""" + .stripMargin) + + // Add CHECK constraint on nested field c2 using ALTER TABLE + sql(s"ALTER TABLE $tableNameAsString ADD CONSTRAINT check_c2 CHECK " + + s"(s.c2 IS NOT NULL AND s.c2 > 1)") + + // Source table schema with struct missing the c2 field + val sourceTableSchema = StructType(Seq( + StructField("pk", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType) + // missing field 'c2' which has CHECK constraint IS NOT NULL AND > 1 + ))), + StructField("dep", StringType) + )) + + val data = Seq( + Row(1, Row(100), "engineering"), + Row(2, Row(200), "finance") + ) + spark.createDataFrame(spark.sparkContext.parallelize(data), sourceTableSchema) + .createOrReplaceTempView("source") + + withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key -> "true") { + val error = intercept[SparkRuntimeException] { + sql( + s"""MERGE INTO $tableNameAsString t USING source + |ON t.pk = source.pk + |WHEN MATCHED THEN + | UPDATE SET s = source.s, dep = source.dep + |""".stripMargin)} + assert(error.getCondition == "CHECK_CONSTRAINT_VIOLATION") + assert(error.getMessage.contains("CHECK constraint check_c2 s.c2 IS NOT NULL AND " + + "s.c2 > 1 violated by row with values:\n - s.c2 : null")) + } + } + sql(s"DROP TABLE IF EXISTS $tableNameAsString") + } + test("merge with schema evolution using dataframe API: add new column and set all") { Seq(true, false).foreach { withSchemaEvolution => val sourceTable = "cat.ns1.source_table"