Skip to content

Commit e5418bb

Browse files
committed
(1) restore addDisplayNameColumn.
(2) only remove deprecated indexes (3) debug backfilling pipeline_versions Signed-off-by: kaikaila <[email protected]>
1 parent e13c443 commit e5418bb

File tree

1 file changed

+206
-51
lines changed

1 file changed

+206
-51
lines changed

backend/src/apiserver/client_manager/client_manager.go

Lines changed: 206 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -488,9 +488,13 @@ func runLegacyUpgradeFlow(db *gorm.DB, dialect SQLDialect) error {
488488
return fmt.Errorf("preflight length check failed: %w", err)
489489
}
490490

491-
// Step 3: drop all indexes and constraints except primary key which blocks shrinking columns
492-
if err := DropAllConstraintsAndIndexes(db, dialect.Name); err != nil {
493-
return fmt.Errorf("drop constraints/indexes failed: %w", err)
491+
// Step 3: drop all foreign key constraints which can block shrinking columns
492+
if err := dropAllFKConstraints(db, dialect.Name); err != nil {
493+
return fmt.Errorf("drop foreign key constraints failed: %w", err)
494+
}
495+
496+
if err := dropLegacyIndexes(db, dialect); err != nil {
497+
return fmt.Errorf("drop legacy indexes failed: %w", err)
494498
}
495499

496500
// Step 4: shrink fields to meet new length constraints
@@ -536,8 +540,21 @@ func runLegacyUpgradeFlow(db *gorm.DB, dialect SQLDialect) error {
536540
return fmt.Errorf("failed to backfill experiment UUID in run_details table: %s", err)
537541
}
538542

539-
if err := db.Migrator().AlterColumn(&model.Pipeline{}, "Description"); err != nil {
540-
return fmt.Errorf("failed to update pipeline description type. Error: %s", err)
543+
// Data backfill in pipelines table for DisplayName column
544+
if db.Migrator().HasTable(&model.Pipeline{}) {
545+
if !db.Migrator().HasColumn(&model.Pipeline{}, "DisplayName") {
546+
if err := addDisplayNameColumn(db, &model.Pipeline{}, dialect); err != nil {
547+
return fmt.Errorf("failed to add DisplayName column to the %s table: %w", model.Pipeline{}.TableName(), err)
548+
}
549+
}
550+
}
551+
// Data backfill in pipeline_versions table for DisplayName column
552+
if db.Migrator().HasTable(&model.PipelineVersion{}) {
553+
if !db.Migrator().HasColumn(&model.PipelineVersion{}, "DisplayName") {
554+
if err := addDisplayNameColumn(db, &model.PipelineVersion{}, dialect); err != nil {
555+
return fmt.Errorf("failed to add DisplayName column to the %s table: %w", model.PipelineVersion{}.TableName(), err)
556+
}
557+
}
541558
}
542559

543560
return nil
@@ -649,20 +666,22 @@ func FieldMeta(db *gorm.DB, mdl interface{}, field string) (table string, dbCol
649666
return stmt.Table, f.DBName, nil
650667
}
651668

652-
func DropAllConstraintsAndIndexes(db *gorm.DB, driverName string) error {
669+
// dropAllFKConstraints drops all foreign key constraints from all tables for the given driver.
670+
// This is the minimal set of DDL needed to safely shrink indexed columns in legacy upgrades.
671+
func dropAllFKConstraints(db *gorm.DB, driverName string) error {
653672
switch driverName {
654673
case "mysql":
655-
return dropAllMySQLConstraintsAndIndexes(db)
674+
return dropAllMySQLFKConstraints(db)
656675
case "pgx":
657-
// PostgreSQL not yet supported. No-op for now.
676+
// PostgreSQL legacy path not implemented; no-op for now.
658677
return nil
659678
default:
660-
return fmt.Errorf("DropAllConstraintsAndIndexes not supported for driver: %s", driverName)
679+
return fmt.Errorf("DropAllFKConstraints not supported for driver: %s", driverName)
661680
}
662681
}
663682

664-
// dropAllMySQLConstraintsAndIndexes drops all foreign key constraints, unique constraints (except PRIMARY), and non-primary indexes from all tables in the current MySQL database.
665-
func dropAllMySQLConstraintsAndIndexes(db *gorm.DB) error {
683+
// dropAllMySQLFKConstraints drops all foreign key constraints from all tables in the current MySQL database.
684+
func dropAllMySQLFKConstraints(db *gorm.DB) error {
666685
tables := []string{}
667686
if err := db.Raw("SHOW TABLES").Scan(&tables).Error; err != nil {
668687
return fmt.Errorf("failed to list tables: %w", err)
@@ -695,57 +714,132 @@ func dropAllMySQLConstraintsAndIndexes(db *gorm.DB) error {
695714
}
696715
}
697716
}
717+
return nil
718+
}
698719

699-
// Drop UNIQUE constraints except PRIMARY KEY
700-
rows2, err := db.Raw(`
701-
SELECT constraint_name, table_name
702-
FROM information_schema.table_constraints
703-
WHERE constraint_schema = DATABASE()
704-
AND constraint_type = 'UNIQUE'
705-
AND constraint_name != 'PRIMARY'
706-
`).Rows()
707-
if err != nil {
708-
return fmt.Errorf("failed to list unique constraints: %w", err)
720+
// dropLegacyIndexes removes a small, explicit set of legacy indexes that
721+
// conflict/duplicate with GORM tag definitions. MySQL only; PostgreSQL is no-op.
722+
func dropLegacyIndexes(db *gorm.DB, dialect SQLDialect) error {
723+
switch dialect.Name {
724+
case "mysql":
725+
return dropLegacyIndexesMySQL(db)
726+
case "pgx":
727+
// No legacy cleanup needed for PostgreSQL per upstream note.
728+
return nil
729+
default:
730+
return fmt.Errorf("dropLegacyIndexes: unsupported dialect %q", dialect.Name)
709731
}
710-
defer rows2.Close()
732+
}
711733

712-
for rows2.Next() {
713-
var constraintName, tableName string
714-
if err := rows2.Scan(&constraintName, &tableName); err != nil {
715-
return fmt.Errorf("failed to scan unique constraint row: %w", err)
734+
// dropLegacyIndexesMySQL removes a small, explicit set of legacy indexes on MySQL:
735+
// See https://github.com/kubeflow/pipelines/pull/12013
736+
// The function is idempotent: it queries information_schema/SHOW INDEX first and only drops/renames when needed.
737+
func dropLegacyIndexesMySQL(db *gorm.DB) error {
738+
ixLogPrefix := "[legacy-index-cleanup] "
739+
740+
type idxRow struct {
741+
IndexName string `gorm:"column:INDEX_NAME"`
742+
}
743+
744+
// --- Rules 1 & 2: drop single-column unique index on Name for experiments/pipelines ---
745+
for _, table := range []string{"experiments", "pipelines"} {
746+
var rows []idxRow
747+
// Find indexes that are UNIQUE, non-PRIMARY, and exactly one column which is Name.
748+
q := fmt.Sprintf(`
749+
SELECT INDEX_NAME
750+
FROM information_schema.STATISTICS
751+
WHERE TABLE_SCHEMA = DATABASE()
752+
AND TABLE_NAME = '%s'
753+
GROUP BY INDEX_NAME
754+
HAVING
755+
COUNT(*) = 1
756+
AND SUM(CASE WHEN COLUMN_NAME='Name' THEN 1 ELSE 0 END) = 1
757+
AND MAX(NON_UNIQUE) = 0
758+
AND MAX(INDEX_NAME <> 'PRIMARY') = 1;
759+
`, table)
760+
761+
if err := db.Raw(q).Scan(&rows).Error; err != nil {
762+
return fmt.Errorf("query single-column unique Name indexes on %s failed: %w", table, err)
716763
}
717-
dropStmt := fmt.Sprintf("ALTER TABLE `%s` DROP INDEX `%s`", tableName, constraintName)
718-
glog.Infof("Dropping unique constraint: %s", dropStmt)
719-
if err := db.Exec(dropStmt).Error; err != nil {
720-
return fmt.Errorf("failed to drop unique constraint %s on table %s: %w", constraintName, tableName, err)
764+
for _, r := range rows {
765+
glog.Infof("%s dropping single-column unique index %q on %s(Name)", ixLogPrefix, r.IndexName, table)
766+
stmt := fmt.Sprintf("DROP INDEX `%s` ON `%s`", r.IndexName, table)
767+
if err := db.Exec(stmt).Error; err != nil {
768+
return fmt.Errorf("drop index %s on %s failed: %w", r.IndexName, table, err)
769+
}
721770
}
722771
}
723772

724-
// Drop non-primary indexes
725-
for _, table := range tables {
726-
var indexes []struct {
727-
KeyName string `gorm:"column:Key_name"`
773+
// --- Rule 3: drop legacy composite unique index on pipeline_versions if present ---
774+
{
775+
if db.Migrator().HasTable(&model.PipelineVersion{}) {
776+
rows, err := db.Raw(`SHOW INDEX FROM pipeline_versions WHERE Key_name='idx_pipeline_version_uuid_name'`).Rows()
777+
if err != nil {
778+
return fmt.Errorf("query pipeline_versions indices failed: %w", err)
779+
}
780+
has := false
781+
if err := rows.Err(); err != nil {
782+
_ = rows.Close()
783+
return fmt.Errorf("iterate pipeline_versions indices failed: %w", err)
784+
}
785+
if rows.Next() {
786+
has = true
787+
}
788+
_ = rows.Close()
789+
if has {
790+
glog.Infof("%s dropping legacy composite unique index idx_pipeline_version_uuid_name on pipeline_versions", ixLogPrefix)
791+
if err := db.Exec("DROP INDEX `idx_pipeline_version_uuid_name` ON `pipeline_versions`").Error; err != nil {
792+
return fmt.Errorf("drop idx_pipeline_version_uuid_name on pipeline_versions failed: %w", err)
793+
}
794+
}
795+
} else {
796+
glog.Infof("%s skip pipeline_versions cleanup: table not found (fresh-introduced case)", ixLogPrefix)
728797
}
729-
err = db.Raw(fmt.Sprintf("SHOW INDEX FROM `%s`", table)).Scan(&indexes).Error
730-
if err != nil {
731-
glog.Warningf("failed to list indexes for table %s: %v", table, err)
732-
continue
798+
}
799+
800+
// --- Rule 4: normalize pipelines(Name,Namespace) unique index to keep only `namespace_name` ---
801+
{
802+
var rows []idxRow
803+
q := `
804+
SELECT INDEX_NAME
805+
FROM information_schema.STATISTICS
806+
WHERE TABLE_SCHEMA = DATABASE()
807+
AND TABLE_NAME = 'pipelines'
808+
GROUP BY INDEX_NAME
809+
HAVING
810+
COUNT(*) = 2
811+
AND SUM(CASE WHEN SEQ_IN_INDEX=1 AND COLUMN_NAME='Name' THEN 1 ELSE 0 END) = 1
812+
AND SUM(CASE WHEN SEQ_IN_INDEX=2 AND COLUMN_NAME='Namespace' THEN 1 ELSE 0 END) = 1
813+
AND MAX(NON_UNIQUE) = 0;
814+
`
815+
if err := db.Raw(q).Scan(&rows).Error; err != nil {
816+
return fmt.Errorf("query composite unique (Name,Namespace) indexes on pipelines failed: %w", err)
817+
}
818+
// Set of existing names for this composite index
819+
has := map[string]bool{}
820+
for _, r := range rows {
821+
has[r.IndexName] = true
733822
}
734-
seen := make(map[string]bool)
735-
for _, idx := range indexes {
736-
if idx.KeyName == "PRIMARY" || seen[idx.KeyName] {
737-
continue
738-
}
739-
seen[idx.KeyName] = true
740823

741-
glog.Infof("Dropping index %s on table %s", idx.KeyName, table)
742-
if err := db.Exec(fmt.Sprintf(
743-
"DROP INDEX `%s` ON `%s`", idx.KeyName, table,
744-
)).Error; err != nil {
745-
return fmt.Errorf("failed to drop index %s on table %s: %w", idx.KeyName, table, err)
824+
// Case A: both exist -> drop legacy name_namespace_index, keep namespace_name
825+
if has["namespace_name"] && has["name_namespace_index"] {
826+
glog.Infof("%s dropping duplicate unique index name_namespace_index on pipelines(Name,Namespace), keeping namespace_name", ixLogPrefix)
827+
if err := db.Exec("DROP INDEX `name_namespace_index` ON `pipelines`").Error; err != nil {
828+
return fmt.Errorf("drop name_namespace_index on pipelines failed: %w", err)
829+
}
830+
} else if !has["namespace_name"] && has["name_namespace_index"] {
831+
// Case B: only legacy exists -> try rename to avoid rebuild; fallback to drop.
832+
glog.Infof("%s renaming name_namespace_index -> namespace_name on pipelines(Name,Namespace)", ixLogPrefix)
833+
if err := db.Exec("ALTER TABLE `pipelines` RENAME INDEX `name_namespace_index` TO `namespace_name`").Error; err != nil {
834+
glog.Warningf("%s rename failed (%v); dropping name_namespace_index; AutoMigrate will recreate namespace_name", ixLogPrefix, err)
835+
if err2 := db.Exec("DROP INDEX `name_namespace_index` ON `pipelines`").Error; err2 != nil {
836+
return fmt.Errorf("drop name_namespace_index on pipelines failed after rename attempt: %w", err2)
837+
}
746838
}
747839
}
840+
// Case C/D: only namespace_name exists or none exist -> no-op
748841
}
842+
749843
return nil
750844
}
751845

@@ -793,6 +887,60 @@ func ensureColumnLength(db *gorm.DB, spec validation.ColLenSpec) error {
793887
return nil
794888
}
795889

890+
// addDisplayNameColumn ensures the DisplayName column exists on the given model's table,
891+
// backfills it from Name where missing, and then enforces NOT NULL.
892+
// It is safe to call multiple times (idempotent).
893+
func addDisplayNameColumn(db *gorm.DB, mdl interface{}, dialect SQLDialect) error {
894+
895+
table, dbCol, err := FieldMeta(db, mdl, "DisplayName")
896+
if err != nil {
897+
return fmt.Errorf("resolve meta for %T.DisplayName failed: %w", mdl, err)
898+
}
899+
// Only allow these tables to have DisplayName added to prevent accidental schema changes.
900+
allowed := map[string]bool{
901+
model.Pipeline{}.TableName(): true,
902+
model.PipelineVersion{}.TableName(): true,
903+
}
904+
if !allowed[table] {
905+
return fmt.Errorf("table %q is not allowed for DisplayName migration", table)
906+
}
907+
908+
if db.Migrator().HasColumn(mdl, dbCol) {
909+
return nil
910+
}
911+
912+
q := dialect.QuoteIdentifier
913+
quotedTable := q(table)
914+
glog.Info("Adding DisplayName column to " + quotedTable)
915+
916+
return db.Transaction(func(tx *gorm.DB) error {
917+
var stmts []string
918+
switch dialect.Name {
919+
case "mysql":
920+
stmts = []string{
921+
"ALTER TABLE " + quotedTable + " ADD COLUMN " + q(dbCol) + " VARCHAR(255) NULL;",
922+
"UPDATE " + quotedTable + " SET " + q(dbCol) + " = " + q("Name") + " WHERE " + q(dbCol) + " IS NULL;",
923+
"ALTER TABLE " + quotedTable + " MODIFY COLUMN " + q(dbCol) + " VARCHAR(255) NOT NULL;",
924+
}
925+
case "pgx":
926+
stmts = []string{
927+
"ALTER TABLE " + quotedTable + " ADD COLUMN " + q(dbCol) + " VARCHAR(255);",
928+
"UPDATE " + quotedTable + " SET " + q(dbCol) + " = " + q("Name") + " WHERE " + q(dbCol) + " IS NULL;",
929+
"ALTER TABLE " + quotedTable + " ALTER COLUMN " + q(dbCol) + " SET NOT NULL;",
930+
}
931+
default:
932+
return fmt.Errorf("unsupported driver: %s", dialect.Name)
933+
}
934+
935+
for _, s := range stmts {
936+
if err := tx.Exec(s).Error; err != nil {
937+
return fmt.Errorf("exec failed for %q: %w", s, err)
938+
}
939+
}
940+
return nil
941+
})
942+
}
943+
796944
func initMinioClient(ctx context.Context, initConnectionTimeout time.Duration) storage.ObjectStoreInterface {
797945
// Create minio client.
798946
minioServiceHost := common.GetStringConfigWithDefault(
@@ -878,8 +1026,15 @@ func initPipelineVersionsFromPipelines(db *gorm.DB) {
8781026
// pipeline version API is introduced will have different Ids; and the minio
8791027
// file will be put directly into the directories for pipeline versions.
8801028
tx.Exec(`INSERT INTO
881-
pipeline_versions (UUID, Name, CreatedAtInSec, Parameters, Status, PipelineId)
882-
SELECT UUID, Name, CreatedAtInSec, Parameters, Status, UUID FROM pipelines;`)
1029+
pipeline_versions (UUID, Name, CreatedAtInSec, Parameters, Status, PipelineId)
1030+
SELECT
1031+
p.UUID,
1032+
p.Name,
1033+
p.CreatedAtInSec,
1034+
COALESCE(p.Parameters, '{}') AS Parameters,
1035+
p.Status,
1036+
p.UUID AS PipelineId
1037+
FROM pipelines p;`)
8831038

8841039
// Step 2: modifiy pipelines table after pipeline_versions are populated.
8851040
tx.Exec("update pipelines set DefaultVersionId=UUID;")

0 commit comments

Comments
 (0)