Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5235,6 +5235,54 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase
sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}

test("merge with struct missing nested field with check constraint") {
withTempView("source") {
// Target table has struct with nested field c2
createAndInitTable(
s"""pk INT NOT NULL,
|s STRUCT<c1: INT, c2: INT>,
|dep STRING""".stripMargin,
"""{ "pk": 0, "s": { "c1": 1, "c2": 10 }, "dep": "sales" }
|{ "pk": 1, "s": { "c1": 2, "c2": 20 }, "dep": "hr" }"""
.stripMargin)

// Add CHECK constraint on nested field c2 using ALTER TABLE
sql(s"ALTER TABLE $tableNameAsString ADD CONSTRAINT check_c2 CHECK " +
s"(s.c2 IS NOT NULL AND s.c2 > 1)")

// Source table schema with struct missing the c2 field
val sourceTableSchema = StructType(Seq(
StructField("pk", IntegerType),
StructField("s", StructType(Seq(
StructField("c1", IntegerType)
// missing field 'c2' which has CHECK constraint IS NOT NULL AND > 1
))),
StructField("dep", StringType)
))

val data = Seq(
Row(1, Row(100), "engineering"),
Row(2, Row(200), "finance")
)
spark.createDataFrame(spark.sparkContext.parallelize(data), sourceTableSchema)
.createOrReplaceTempView("source")

withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key -> "true") {
val error = intercept[SparkRuntimeException] {
sql(
s"""MERGE INTO $tableNameAsString t USING source
|ON t.pk = source.pk
|WHEN MATCHED THEN
| UPDATE SET s = source.s, dep = source.dep
|""".stripMargin)}
assert(error.getCondition == "CHECK_CONSTRAINT_VIOLATION")
assert(error.getMessage.contains("CHECK constraint check_c2 s.c2 IS NOT NULL AND " +
"s.c2 > 1 violated by row with values:\n - s.c2 : null"))
}
}
sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}

test("merge with schema evolution using dataframe API: add new column and set all") {
Seq(true, false).foreach { withSchemaEvolution =>
val sourceTable = "cat.ns1.source_table"
Expand Down