Skip to content

Commit 3457df6

Browse files
committed
[SPARK-52641][SQL] Primary key columns should not be nullable
### What changes were proposed in this pull request? When creating or replacing a table with primary key, all the columns of the primary key should not be nullable ### Why are the changes needed? Correct the expected table schema when create/replace table with primary key ### Does this PR introduce _any_ user-facing change? No, the feature is not released yet ### How was this patch tested? New UT ### Was this patch authored or co-authored using generative AI tooling? No Closes #51525 from gengliangwang/pkColumnNotNull. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: Gengliang Wang <[email protected]>
1 parent 0982041 commit 3457df6

File tree

3 files changed

+68
-15
lines changed

3 files changed

+68
-15
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4894,8 +4894,23 @@ class AstBuilder extends DataTypeAstBuilder
48944894
.mkString(", ")
48954895
throw QueryParsingErrors.multiplePrimaryKeysError(ctx, primaryKeyColumns)
48964896
}
4897+
// If there is a primary key constraint, all the columns in the primary key are not null.
4898+
val updatedColumns = if (primaryKeys.nonEmpty) {
4899+
val lowerCasePkColumns = primaryKeys.head.asInstanceOf[PrimaryKeyConstraint].columns
4900+
.map(_.toLowerCase(Locale.ROOT))
4901+
columnDefs.map { colDef =>
4902+
if (colDef.nullable &&
4903+
lowerCasePkColumns.contains(colDef.name.toLowerCase(Locale.ROOT))) {
4904+
colDef.copy(nullable = false)
4905+
} else {
4906+
colDef
4907+
}
4908+
}
4909+
} else {
4910+
columnDefs
4911+
}
48974912

4898-
(columnDefs.toSeq, constraints.toSeq)
4913+
(updatedColumns.toSeq, constraints.toSeq)
48994914
}
49004915
}
49014916

sql/core/src/test/scala/org/apache/spark/sql/execution/command/ConstraintParseSuiteBase.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,13 @@ abstract class ConstraintParseSuiteBase extends AnalysisTest with SharedSparkSes
8080
protected def verifyConstraints(
8181
sql: String,
8282
constraints: Seq[TableConstraint],
83-
isCreateTable: Boolean = true): Unit = {
83+
isCreateTable: Boolean = true,
84+
columnANullable: Boolean = true,
85+
columnBNullable: Boolean = true): Unit = {
8486
val parsed = parsePlan(sql)
8587
val columns = Seq(
86-
ColumnDefinition("a", IntegerType),
87-
ColumnDefinition("b", StringType)
88+
ColumnDefinition("a", IntegerType, nullable = columnANullable),
89+
ColumnDefinition("b", StringType, nullable = columnBNullable)
8890
)
8991
val expected = createExpectedPlan(
9092
columns = columns, tableConstraints = constraints, isCreateTable = isCreateTable)

sql/core/src/test/scala/org/apache/spark/sql/execution/command/PrimaryKeyConstraintParseSuite.scala

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedTable
2121
import org.apache.spark.sql.catalyst.expressions.PrimaryKeyConstraint
2222
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan
2323
import org.apache.spark.sql.catalyst.parser.ParseException
24-
import org.apache.spark.sql.catalyst.plans.logical.AddConstraint
24+
import org.apache.spark.sql.catalyst.plans.logical.{AddConstraint, ColumnDefinition}
25+
import org.apache.spark.sql.types.StringType
2526

2627
class PrimaryKeyConstraintParseSuite extends ConstraintParseSuiteBase {
2728
override val validConstraintCharacteristics =
@@ -34,7 +35,7 @@ class PrimaryKeyConstraintParseSuite extends ConstraintParseSuiteBase {
3435
tableName = "t",
3536
userProvidedName = null)
3637
val constraints = Seq(constraint)
37-
verifyConstraints(sql, constraints)
38+
verifyConstraints(sql, constraints, columnANullable = false)
3839
}
3940

4041
test("Create table with named primary key - table level") {
@@ -45,7 +46,7 @@ class PrimaryKeyConstraintParseSuite extends ConstraintParseSuiteBase {
4546
userProvidedName = "pk1"
4647
)
4748
val constraints = Seq(constraint)
48-
verifyConstraints(sql, constraints)
49+
verifyConstraints(sql, constraints, columnANullable = false)
4950
}
5051

5152
test("Create table with composite primary key - table level") {
@@ -55,7 +56,24 @@ class PrimaryKeyConstraintParseSuite extends ConstraintParseSuiteBase {
5556
tableName = "t",
5657
userProvidedName = null)
5758
val constraints = Seq(constraint)
58-
verifyConstraints(sql, constraints)
59+
verifyConstraints(sql, constraints, columnANullable = false, columnBNullable = false)
60+
}
61+
62+
test("Create table with composite primary key - case insensitivity") {
63+
val sql = "CREATE TABLE t (FirstName STRING, LastName STRING," +
64+
" PRIMARY KEY (firstName, LASTNAME)) USING parquet"
65+
val constraint = PrimaryKeyConstraint(
66+
columns = Seq("firstName", "LASTNAME"),
67+
tableName = "t",
68+
userProvidedName = null)
69+
val expectedPlan = createExpectedPlan(
70+
columns = Seq(
71+
ColumnDefinition("FirstName", StringType, nullable = false),
72+
ColumnDefinition("LastName", StringType, nullable = false)),
73+
tableConstraints = Seq(constraint),
74+
isCreateTable = true)
75+
val parsed = parsePlan(sql)
76+
comparePlans(parsed, expectedPlan)
5977
}
6078

6179
test("Create table with primary key - column level") {
@@ -65,7 +83,7 @@ class PrimaryKeyConstraintParseSuite extends ConstraintParseSuiteBase {
6583
tableName = "t",
6684
userProvidedName = null)
6785
val constraints = Seq(constraint)
68-
verifyConstraints(sql, constraints)
86+
verifyConstraints(sql, constraints, columnANullable = false)
6987
}
7088

7189
test("Create table with named primary key - column level") {
@@ -76,7 +94,7 @@ class PrimaryKeyConstraintParseSuite extends ConstraintParseSuiteBase {
7694
userProvidedName = "pk1"
7795
)
7896
val constraints = Seq(constraint)
79-
verifyConstraints(sql, constraints)
97+
verifyConstraints(sql, constraints, columnANullable = false)
8098
}
8199

82100
test("Create table with multiple primary keys should fail") {
@@ -101,7 +119,7 @@ class PrimaryKeyConstraintParseSuite extends ConstraintParseSuiteBase {
101119
tableName = "t",
102120
userProvidedName = null)
103121
val constraints = Seq(constraint)
104-
verifyConstraints(sql, constraints, isCreateTable = false)
122+
verifyConstraints(sql, constraints, isCreateTable = false, columnANullable = false)
105123
}
106124

107125
test("Replace table with named primary key - table level") {
@@ -112,7 +130,7 @@ class PrimaryKeyConstraintParseSuite extends ConstraintParseSuiteBase {
112130
userProvidedName = "pk1"
113131
)
114132
val constraints = Seq(constraint)
115-
verifyConstraints(sql, constraints, isCreateTable = false)
133+
verifyConstraints(sql, constraints, isCreateTable = false, columnANullable = false)
116134
}
117135

118136
test("Replace table with composite primary key - table level") {
@@ -122,7 +140,25 @@ class PrimaryKeyConstraintParseSuite extends ConstraintParseSuiteBase {
122140
tableName = "t",
123141
userProvidedName = null)
124142
val constraints = Seq(constraint)
125-
verifyConstraints(sql, constraints, isCreateTable = false)
143+
verifyConstraints(sql, constraints, isCreateTable = false, columnANullable = false,
144+
columnBNullable = false)
145+
}
146+
147+
test("Replace table with composite primary key - case insensitivity") {
148+
val sql = "REPLACE TABLE t (FirstName STRING, LastName STRING," +
149+
" PRIMARY KEY (firstName, LASTNAME)) USING parquet"
150+
val constraint = PrimaryKeyConstraint(
151+
columns = Seq("firstName", "LASTNAME"),
152+
tableName = "t",
153+
userProvidedName = null)
154+
val expectedPlan = createExpectedPlan(
155+
columns = Seq(
156+
ColumnDefinition("FirstName", StringType, nullable = false),
157+
ColumnDefinition("LastName", StringType, nullable = false)),
158+
tableConstraints = Seq(constraint),
159+
isCreateTable = false)
160+
val parsed = parsePlan(sql)
161+
comparePlans(parsed, expectedPlan)
126162
}
127163

128164
test("Replace table with primary key - column level") {
@@ -132,7 +168,7 @@ class PrimaryKeyConstraintParseSuite extends ConstraintParseSuiteBase {
132168
tableName = "t",
133169
userProvidedName = null)
134170
val constraints = Seq(constraint)
135-
verifyConstraints(sql, constraints, isCreateTable = false)
171+
verifyConstraints(sql, constraints, isCreateTable = false, columnANullable = false)
136172
}
137173

138174
test("Replace table with named primary key - column level") {
@@ -143,7 +179,7 @@ class PrimaryKeyConstraintParseSuite extends ConstraintParseSuiteBase {
143179
userProvidedName = "pk1"
144180
)
145181
val constraints = Seq(constraint)
146-
verifyConstraints(sql, constraints, isCreateTable = false)
182+
verifyConstraints(sql, constraints, isCreateTable = false, columnANullable = false)
147183
}
148184

149185
test("Replace table with multiple primary keys should fail") {

0 commit comments

Comments
 (0)