diff --git a/cmd/substreams-sink-sql/run.go b/cmd/substreams-sink-sql/run.go index 93b636a..7ab64d7 100644 --- a/cmd/substreams-sink-sql/run.go +++ b/cmd/substreams-sink-sql/run.go @@ -36,6 +36,11 @@ var sinkRunCmd = Command(sinkRunE, flags.Int("flush-retry-count", 3, "Number of retry attempts for flush operations") flags.Duration("flush-retry-delay", 1*time.Second, "Base delay for incremental retry backoff on flush failures") flags.StringP("endpoint", "e", "", "Specify the substreams endpoint, ex: `mainnet.eth.streamingfast.io:443`") + + // Postgres insert-only batching (runtime-only flags) + flags.String("pg-insert-batch-mode", "off", "Postgres insert batching mode: off|values|unnest (runtime-only)") + flags.Int("pg-insert-batch-size", 1000, "Postgres insert batch size when batching is enabled (runtime-only)") + flags.Bool("pg-insert-only", false, "Assert insert-only processing; if other ops are present, fallback or error based on future wiring (runtime-only)") }), Example("substreams-sink-sql run 'postgres://localhost:5432/posgres?sslmode=disable' uniswap-v3@v0.2.10"), OnCommandErrorLogAndExit(zlog), @@ -98,6 +103,11 @@ func sinkRunE(cmd *cobra.Command, args []string) error { flushRetryCount := sflags.MustGetInt(cmd, "flush-retry-count") flushRetryDelay := sflags.MustGetDuration(cmd, "flush-retry-delay") + // Read Postgres insert batching flags + pgInsertBatchMode := sflags.MustGetString(cmd, "pg-insert-batch-mode") + pgInsertBatchSize := sflags.MustGetInt(cmd, "pg-insert-batch-size") + pgInsertOnly := sflags.MustGetBool(cmd, "pg-insert-only") + cursorTableName := sflags.MustGetString(cmd, "cursors-table") historyTableName := sflags.MustGetString(cmd, "history-table") handleReorgs := sflags.MustGetInt(cmd, "undo-buffer-size") == 0 @@ -113,6 +123,9 @@ func sinkRunE(cmd *cobra.Command, args []string) error { HandleReorgs: handleReorgs, FlushRetryCount: flushRetryCount, FlushRetryDelay: flushRetryDelay, + PgInsertBatchMode: pgInsertBatchMode, + PgInsertBatchSize: pgInsertBatchSize, + PgInsertOnly: pgInsertOnly, }) postgresSinker, err := sinkerFactory(app.Context(), dsnString, zlog, tracer) diff --git a/db_changes/db/db.go b/db_changes/db/db.go index 40eb364..a4b3438 100644 --- a/db_changes/db/db.go +++ b/db_changes/db/db.go @@ -47,6 +47,10 @@ type Loader struct { testTx *TestTx // used for testing: if non-nil, 'loader.BeginTx()' will return this object instead of a real *sql.Tx dsn *DSN + // Postgres insert batching (runtime-configurable; defaults imply disabled) + pgInsertBatchMode string // off|values|unnest + pgInsertBatchSize int // >0 when enabled + pgInsertOnly bool // assert-only mode; if true, non-insert ops should be rejected or cause fallback batchOrdinal uint64 // Counter for ordinals within the current batch, resets on flush } @@ -161,6 +165,32 @@ func (l *Loader) FlushNeeded() bool { return totalRows > l.batchRowFlushInterval } +// PgInsertBatchMode returns the configured Postgres batch mode ("off" if unset). +func (l *Loader) PgInsertBatchMode() string { + if l.pgInsertBatchMode == "" { + return "off" + } + return l.pgInsertBatchMode +} + +// PgInsertBatchSize returns the configured batch size (0 means unset/disabled). +func (l *Loader) PgInsertBatchSize() int { + return l.pgInsertBatchSize +} + +// PgInsertOnly indicates whether insert-only assertions are enabled. +func (l *Loader) PgInsertOnly() bool { + return l.pgInsertOnly +} + +// ConfigurePgInsertBatching sets Postgres insert batching configuration on the loader. +// mode should be one of: "off", "values", "unnest". size <= 0 disables batching. +func (l *Loader) ConfigurePgInsertBatching(mode string, size int, insertOnly bool) { + l.pgInsertBatchMode = mode + l.pgInsertBatchSize = size + l.pgInsertOnly = insertOnly +} + // getTablesFromSchema returns table information similar to schema.Tables() // but only inspects tables in the specified schema to avoid issues with database extensions func (l *Loader) getTablesFromSchema(schemaName string) (map[[2]string][]*sql.ColumnType, error) { @@ -209,6 +239,116 @@ func (l *Loader) getTablesFromSchema(schemaName string) (map[[2]string][]*sql.Co return result, nil } +// getTableColumns returns column information for a specific table +func (l *Loader) getTableColumns(schemaName, tableName string) ([]*sql.ColumnType, error) { + // Use a simple query to get column information + query := fmt.Sprintf("SELECT * FROM %s.%s WHERE 1=0", + EscapeIdentifier(schemaName), + EscapeIdentifier(tableName)) + + rows, err := l.DB.Query(query) + if err != nil { + return nil, fmt.Errorf("querying table structure: %w", err) + } + defer rows.Close() + + return rows.ColumnTypes() +} + +// fetchColumnPgTypes returns a mapping of column name to PostgreSQL type name (pg_type.typname), +// which properly reflects user-defined types (e.g., enums) and array types (leading underscore). +// Example: bigint[] -> _int8, enum call_type -> call_type, enum[] -> _call_type. +func (l *Loader) fetchColumnPgTypes(schemaName, tableName string) (map[string]string, error) { + query := ` + SELECT a.attname, t.typname + FROM pg_attribute a + JOIN pg_class c ON c.oid = a.attrelid + JOIN pg_namespace n ON n.oid = c.relnamespace + JOIN pg_type t ON t.oid = a.atttypid + WHERE n.nspname = $1 + AND c.relname = $2 + AND a.attnum > 0 + AND NOT a.attisdropped + ORDER BY a.attnum` + + rows, err := l.DB.Query(query, schemaName, tableName) + if err != nil { + return nil, fmt.Errorf("querying pg types for %s.%s: %w", schemaName, tableName, err) + } + defer rows.Close() + + out := make(map[string]string) + for rows.Next() { + var col string + var typ string + if err := rows.Scan(&col, &typ); err != nil { + return nil, fmt.Errorf("scanning pg types row: %w", err) + } + out[col] = typ + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterating pg types rows: %w", err) + } + return out, nil +} + +// fetchColumnNullabilityDefaults returns per-column nullability and default expression info. +// defaultExpr is returned as a raw SQL expression via pg_get_expr(adbin, adrelid). +func (l *Loader) fetchColumnNullabilityDefaults(schemaName, tableName string) (map[string]struct { + Nullable bool + HasDefault bool + DefaultExpr string +}, error) { + query := ` + SELECT a.attname, + NOT a.attnotnull AS nullable, + (ad.adbin IS NOT NULL) AS has_default, + pg_get_expr(ad.adbin, ad.adrelid) AS default_expr + FROM pg_attribute a + JOIN pg_class c ON c.oid = a.attrelid + JOIN pg_namespace n ON n.oid = c.relnamespace + LEFT JOIN pg_attrdef ad ON ad.adrelid = a.attrelid AND ad.adnum = a.attnum + WHERE n.nspname = $1 + AND c.relname = $2 + AND a.attnum > 0 + AND NOT a.attisdropped + ORDER BY a.attnum` + + rows, err := l.DB.Query(query, schemaName, tableName) + if err != nil { + return nil, fmt.Errorf("querying nullability/defaults for %s.%s: %w", schemaName, tableName, err) + } + defer rows.Close() + + out := make(map[string]struct { + Nullable bool + HasDefault bool + DefaultExpr string + }) + for rows.Next() { + var col string + var nullable bool + var hasDefault bool + var defaultExpr sql.NullString + if err := rows.Scan(&col, &nullable, &hasDefault, &defaultExpr); err != nil { + return nil, fmt.Errorf("scanning nullability/default row: %w", err) + } + expr := "" + if defaultExpr.Valid { + expr = defaultExpr.String + } + out[col] = struct { + Nullable bool + HasDefault bool + DefaultExpr string + }{Nullable: nullable, HasDefault: hasDefault, DefaultExpr: expr} + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterating nullability/default rows: %w", err) + } + return out, nil +} + func (l *Loader) LoadTables(schemaName string, cursorTableName string, historyTableName string) error { schemaTables, err := l.getTablesFromSchema(schemaName) if err != nil { @@ -240,13 +380,27 @@ func (l *Loader) LoadTables(schemaName string, cursorTableName string, historyTa } columnByName := make(map[string]*ColumnInfo, len(columns)) + // Try to enrich with real PostgreSQL type names (handles enums/domains/arrays) + pgTypes, _ := l.fetchColumnPgTypes(schemaName, tableName) + // Also fetch nullability/defaults metadata for 11c default inlining + ndMeta, _ := l.fetchColumnNullabilityDefaults(schemaName, tableName) for _, f := range columns { - columnByName[f.Name()] = &ColumnInfo{ + dbType := f.DatabaseTypeName() + if real, ok := pgTypes[f.Name()]; ok && real != "" { + dbType = real + } + info := &ColumnInfo{ name: f.Name(), escapedName: EscapeIdentifier(f.Name()), - databaseTypeName: f.DatabaseTypeName(), + databaseTypeName: dbType, scanType: f.ScanType(), } + if meta, ok := ndMeta[f.Name()]; ok { + info.nullable = meta.Nullable + info.hasDefault = meta.HasDefault + info.defaultExpr = meta.DefaultExpr + } + columnByName[f.Name()] = info } key, err := l.dialect.GetPrimaryKey(l.DB, schemaName, tableName) diff --git a/db_changes/db/dialect_postgres.go b/db_changes/db/dialect_postgres.go index 40e35d7..f64d0e9 100644 --- a/db_changes/db/dialect_postgres.go +++ b/db_changes/db/dialect_postgres.go @@ -19,6 +19,38 @@ import ( "golang.org/x/exp/maps" ) +// arrayExprToTextLiteral converts a SQL array expression like ARRAY[1,2]::bigint[] or '{1,2}' +// to a Postgres text array literal suitable for casting from text[] via ::type[] later. +// This is a best-effort transformation for builder use only. +func arrayExprToTextLiteral(expr string) string { + s := strings.TrimSpace(expr) + if strings.HasPrefix(s, "ARRAY[") { + // Extract inside ARRAY[...] + inner := strings.TrimPrefix(s, "ARRAY[") + // Drop everything after the closing ']' then wrap in braces + idx := strings.Index(inner, "]") + if idx >= 0 { + inner = inner[:idx] + } + return "{" + inner + "}" + } + // Already a brace literal or quoted brace literal + if strings.HasPrefix(s, "'{") || strings.HasPrefix(s, "{") { + // Strip trailing casts like '::type[]' + if i := strings.Index(s, "::"); i >= 0 { + s = s[:i] + } + // Remove leading quote if present + if strings.HasPrefix(s, "'") && strings.HasSuffix(s, "'") { + s = strings.TrimPrefix(s, "'") + s = strings.TrimSuffix(s, "'") + } + return s + } + // Fallback: wrap as single element + return "{" + s + "}" +} + type PostgresDialect struct { cursorTableName string historyTableName string @@ -96,9 +128,7 @@ func (d PostgresDialect) Flush(tx Tx, ctx context.Context, l *Loader, outputModu entries := entriesPair.Value totalRows += entries.Len() - if l.tracer.Enabled() { - l.logger.Debug("flushing table rows", zap.String("table_name", entriesPair.Key), zap.Int("row_count", entries.Len())) - } + l.logger.Debug("flushing table rows", zap.String("table_name", entriesPair.Key), zap.Int("row_count", entries.Len())) } allOperations := make([]*Operation, 0, totalRows) @@ -113,21 +143,293 @@ func (d PostgresDialect) Flush(tx Tx, ctx context.Context, l *Loader, outputModu return cmp.Compare(a.ordinal, b.ordinal) }) - var rowCount int - for _, entry := range allOperations { - query, err := d.prepareStatement(d.schemaName, entry) - if err != nil { - return 0, fmt.Errorf("failed to prepare statement: %w", err) + // Build execution segments (single ops or VALUES batches) preserving global order + type execSegment struct { + kind string // "single" | "values-batch" + start int + end int + sql string // prebuilt SQL for batch + mode PgBatchMode // values | unnest (only relevant for batch kind) + } + + segments := make([]execSegment, 0, len(allOperations)) + mode := d.effectivePgBatchMode(l) + batchSize := d.effectivePgBatchSize(l) + if batchSize < 2 { + batchSize = 2 + } + + // Trace effective batching configuration + l.logger.Info("postgres batching config", + zap.String("mode", string(mode)), + zap.Int("batch_size", batchSize), + zap.Bool("insert_only", d.isPgInsertOnly(l)), + ) + + for i := 0; i < len(allOperations); { + op := allOperations[i] + if (mode == PgBatchModeValues || mode == PgBatchModeUnnest) && op.opType == OperationTypeInsert { + // Attempt to form a VALUES batch + tbl := op.table + start := i + end := i + count := 1 + for j := i + 1; j < len(allOperations) && count < batchSize; j++ { + next := allOperations[j] + if next.opType != OperationTypeInsert || next.table != tbl { + break + } + end = j + count++ + } + + var reason string + if count >= batchSize { + reason = "batch_size_cap" + } else if end+1 < len(allOperations) { + next := allOperations[end+1] + if next.table != tbl { + reason = "table_change" + } else if next.opType != OperationTypeInsert { + reason = "op_change" + } else { + reason = "unknown" + } + } + l.logger.Debug("batch boundary (insert)", + zap.String("table_name", tbl.identifier), + zap.Int("rows", count), + zap.String("reason", reason), + ) + + if count >= 2 { + cols, vals, planErr := d.computeInsertBatchPlan(allOperations[start : end+1]) + cte, needsHistory := d.buildInsertHistoryCTE(d.schemaName, allOperations[start:end+1]) + if planErr == nil && (needsHistory || true) { // allow both irreversible and reversible (with CTE) + l.logger.Debug("detected insert-only batch", + zap.String("table_name", tbl.identifier), + zap.Int("rows", count), + zap.Int("columns", len(cols)), + zap.Uint64("start_ordinal", allOperations[start].ordinal), + zap.Uint64("end_ordinal", allOperations[end].ordinal), + zap.Bool("with_history_cte", needsHistory), + ) + if mode == PgBatchModeValues { + batchSQL := d.buildValuesInsertSQL(tbl, cols, vals) + if needsHistory { + batchSQL = cte + " " + strings.TrimSuffix(batchSQL, ";") + ";" + } + segments = append(segments, execSegment{kind: "values-batch", start: start, end: end, sql: batchSQL, mode: PgBatchModeValues}) + } else if mode == PgBatchModeUnnest { + if batchSQL, err := d.buildUnnestInsertSQL(tbl, cols, vals); err != nil { + return 0, fmt.Errorf("building UNNEST insert SQL: %w", err) + } else { + if needsHistory { + batchSQL = cte + " " + strings.TrimSuffix(batchSQL, ";") + ";" + } + segments = append(segments, execSegment{kind: "values-batch", start: start, end: end, sql: batchSQL, mode: PgBatchModeUnnest}) + } + } + i = end + 1 + continue + } + + l.logger.Debug("skipping batch candidate", + zap.String("table_name", tbl.identifier), + zap.Error(planErr), + ) + } } - if l.tracer.Enabled() { - l.logger.Debug("adding query from operation to transaction", zap.Stringer("op", entry), zap.String("query", query), zap.Uint64("ordinal", entry.ordinal)) + // Attempt UPSERT batching using UNNEST when enabled and not insert-only mode + if mode == PgBatchModeUnnest && !d.isPgInsertOnly(l) && op.opType == OperationTypeUpsert { + tbl := op.table + start := i + end := i + count := 1 + for j := i + 1; j < len(allOperations) && count < batchSize; j++ { + next := allOperations[j] + if next.opType != OperationTypeUpsert || next.table != tbl { + break + } + end = j + count++ + } + + var reason string + if count >= batchSize { + reason = "batch_size_cap" + } else if end+1 < len(allOperations) { + next := allOperations[end+1] + if next.table != tbl { + reason = "table_change" + } else if next.opType != OperationTypeUpsert { + reason = "op_change" + } else { + reason = "unknown" + } + } + l.logger.Debug("batch boundary (upsert)", + zap.String("table_name", tbl.identifier), + zap.Int("rows", count), + zap.String("reason", reason), + ) + + if count >= 2 { + cols, vals, pres, planErr := d.computeUpsertSupersetPlanWithPresence(allOperations[start : end+1]) + cte, needsHistory := d.buildUpsertHistoryCTE(d.schemaName, tbl, allOperations[start:end+1]) + if planErr == nil { + l.logger.Debug("detected upsert-only UNNEST batch", + zap.String("table_name", tbl.identifier), + zap.Int("rows", count), + zap.Int("columns", len(cols)), + zap.Uint64("start_ordinal", allOperations[start].ordinal), + zap.Uint64("end_ordinal", allOperations[end].ordinal), + zap.Bool("with_history_cte", needsHistory), + ) + if batchSQL, err := d.buildUnnestUpsertSQLWithPresence(tbl, cols, vals, pres); err != nil { + return 0, fmt.Errorf("building UNNEST upsert SQL: %w", err) + } else { + if needsHistory { + batchSQL = cte + " " + strings.TrimSuffix(batchSQL, ";") + ";" + } + segments = append(segments, execSegment{kind: "values-batch", start: start, end: end, sql: batchSQL, mode: PgBatchModeUnnest}) + i = end + 1 + continue + } + } + + l.logger.Debug("skipping upsert UNNEST batch candidate", + zap.String("table_name", tbl.identifier), + zap.Error(planErr), + ) + } } - if _, err := tx.ExecContext(ctx, query); err != nil { - return 0, fmt.Errorf("executing flush query %q: %w", query, err) + // Attempt UPSERT batching using VALUES when enabled and not insert-only mode + if mode == PgBatchModeValues && !d.isPgInsertOnly(l) && op.opType == OperationTypeUpsert { + tbl := op.table + start := i + end := i + count := 1 + for j := i + 1; j < len(allOperations) && count < batchSize; j++ { + next := allOperations[j] + if next.opType != OperationTypeUpsert || next.table != tbl { + break + } + end = j + count++ + } + + var reason string + if count >= batchSize { + reason = "batch_size_cap" + } else if end+1 < len(allOperations) { + next := allOperations[end+1] + if next.table != tbl { + reason = "table_change" + } else if next.opType != OperationTypeUpsert { + reason = "op_change" + } else { + reason = "unknown" + } + } + l.logger.Debug("batch boundary (upsert)", + zap.String("table_name", tbl.identifier), + zap.Int("rows", count), + zap.String("reason", reason), + ) + + if count >= 2 { + cols, vals, planErr := d.computeUpsertBatchPlan(allOperations[start : end+1]) + cte, needsHistory := d.buildUpsertHistoryCTE(d.schemaName, tbl, allOperations[start:end+1]) + if planErr == nil { + l.logger.Debug("detected upsert-only batch", + zap.String("table_name", tbl.identifier), + zap.Int("rows", count), + zap.Int("columns", len(cols)), + zap.Uint64("start_ordinal", allOperations[start].ordinal), + zap.Uint64("end_ordinal", allOperations[end].ordinal), + zap.Bool("with_history_cte", needsHistory), + ) + batchSQL := d.buildValuesUpsertSQL(tbl, cols, vals) + if needsHistory { + batchSQL = cte + " " + strings.TrimSuffix(batchSQL, ";") + ";" + } + segments = append(segments, execSegment{kind: "values-batch", start: start, end: end, sql: batchSQL, mode: PgBatchModeValues}) + i = end + 1 + continue + } + + l.logger.Debug("skipping upsert batch candidate", + zap.String("table_name", tbl.identifier), + zap.Error(planErr), + ) + } + } + + // Fallback: single operation + segments = append(segments, execSegment{kind: "single", start: i, end: i}) + i++ + } + + var rowCount int + singleLogCount := 0 + for _, seg := range segments { + switch seg.kind { + case "values-batch": + l.logger.Debug("executing batch", zap.String("mode", string(seg.mode)), zap.Int("rows", seg.end-seg.start+1), zap.String("sql", seg.sql)) + // For UNNEST mode, fail fast without per-row fallback to ease testing + if seg.mode == PgBatchModeUnnest { + if _, err := tx.ExecContext(ctx, seg.sql); err != nil { + return 0, fmt.Errorf("executing UNNEST batch query %q: %w", seg.sql, err) + } + rowCount += (seg.end - seg.start + 1) + break + } + + // VALUES mode keeps safe fallback path under SAVEPOINT + if _, spErr := tx.ExecContext(ctx, "SAVEPOINT sp_batch"); spErr == nil { + if _, err := tx.ExecContext(ctx, seg.sql); err != nil { + _, _ = tx.ExecContext(ctx, "ROLLBACK TO SAVEPOINT sp_batch") + l.logger.Debug("batch failed, falling back to per-row", zap.Error(err)) + for i := seg.start; i <= seg.end; i++ { + entry := allOperations[i] + query, qerr := d.prepareStatement(d.schemaName, entry) + if qerr != nil { + return 0, fmt.Errorf("prepare fallback statement: %w", qerr) + } + if _, exErr := tx.ExecContext(ctx, query); exErr != nil { + return 0, fmt.Errorf("executing fallback flush query %q: %w", query, exErr) + } + rowCount++ + } + continue + } + _, _ = tx.ExecContext(ctx, "RELEASE SAVEPOINT sp_batch") + rowCount += (seg.end - seg.start + 1) + } else { + // If SAVEPOINT unsupported, execute batch directly; on error, surface it + if _, err := tx.ExecContext(ctx, seg.sql); err != nil { + return 0, fmt.Errorf("executing batch insert query %q: %w", seg.sql, err) + } + rowCount += (seg.end - seg.start + 1) + } + case "single": + entry := allOperations[seg.start] + query, err := d.prepareStatement(d.schemaName, entry) + if err != nil { + return 0, fmt.Errorf("failed to prepare statement: %w", err) + } + if singleLogCount%100 == 0 { + l.logger.Debug("adding query from operation to transaction", zap.Stringer("op", entry), zap.String("query", query), zap.Uint64("ordinal", entry.ordinal)) + } + if _, err := tx.ExecContext(ctx, query); err != nil { + return 0, fmt.Errorf("executing flush query %q: %w", query, err) + } + rowCount++ + singleLogCount++ } - rowCount++ } if err := d.pruneReversibleSegment(tx, ctx, d.schemaName, lastFinalBlock); err != nil { @@ -283,6 +585,39 @@ func (d PostgresDialect) AllowPkDuplicates() bool { return false } +// Helper accessors for Postgres insert batching configuration carried on Loader. +// These are intentionally resolved at Flush-time to keep Dialect construction unchanged. +type PgBatchMode string + +const ( + PgBatchModeOff PgBatchMode = "off" + PgBatchModeValues PgBatchMode = "values" + PgBatchModeUnnest PgBatchMode = "unnest" +) + +func (d PostgresDialect) effectivePgBatchMode(l *Loader) PgBatchMode { + switch strings.ToLower(l.PgInsertBatchMode()) { + case string(PgBatchModeValues): + return PgBatchModeValues + case string(PgBatchModeUnnest): + return PgBatchModeUnnest + default: + return PgBatchModeOff + } +} + +func (d PostgresDialect) effectivePgBatchSize(l *Loader) int { + size := l.PgInsertBatchSize() + if size <= 0 { + return 50 + } + return size +} + +func (d PostgresDialect) isPgInsertOnly(l *Loader) bool { + return l.PgInsertOnly() +} + func (d PostgresDialect) CreateUser(tx Tx, ctx context.Context, l *Loader, username string, password string, database string, readOnly bool) error { user, pass, db := EscapeIdentifier(username), password, EscapeIdentifier(database) var q string @@ -480,6 +815,875 @@ func (d *PostgresDialect) prepareColValues(table *TableInfo, colValues map[strin return } +// buildValuesInsertSQL constructs a multi-row VALUES insert for a single table using the +// provided escaped column names and per-row SQL-literal values (already normalized). +func (d *PostgresDialect) buildValuesInsertSQL(table *TableInfo, columnsEscaped []string, perRowValues [][]string) string { + valuesParts := make([]string, len(perRowValues)) + for i := range perRowValues { + valuesParts[i] = fmt.Sprintf("(%s)", strings.Join(perRowValues[i], ",")) + } + return fmt.Sprintf("INSERT INTO %s (%s) VALUES %s;", + table.identifier, + strings.Join(columnsEscaped, ","), + strings.Join(valuesParts, ","), + ) +} + +// buildUnnestInsertSQL constructs an UNNEST-based insert for a single table. +// It takes escaped column names and per-row SQL-literal values, and produces a +// statement like: +// +// INSERT INTO schema.table (col1, col2) +// SELECT * FROM unnest(ARRAY[...], ARRAY[...]); +// +// We use text arrays and rely on Postgres implicit cast when possible; for +// robustness, we cast each array to the column's database type name when known. +func (d *PostgresDialect) buildUnnestInsertSQL(table *TableInfo, columnsEscaped []string, perRowValues [][]string) (string, error) { + if len(columnsEscaped) == 0 || len(perRowValues) == 0 { + return "", fmt.Errorf("empty columns or rows for UNNEST") + } + + // Build arrays per column from perRowValues + numCols := len(columnsEscaped) + + // Invert rows to columns + colsToValues := make([][]string, numCols) + for i := 0; i < numCols; i++ { + colsToValues[i] = make([]string, len(perRowValues)) + } + for r, row := range perRowValues { + if len(row) != numCols { + return "", fmt.Errorf("row %d has %d values, expected %d", r, len(row), numCols) + } + for c := 0; c < numCols; c++ { + colsToValues[c][r] = row[c] + } + } + + // Determine type casts for each column from table metadata + escapedToInfo := make(map[string]*ColumnInfo, len(table.columnsByName)) + for _, ci := range table.columnsByName { + escapedToInfo[ci.escapedName] = ci + } + + // Partition into scalar and array-typed columns + scalarIdx := make([]int, 0, numCols) + arrayIdx := make([]int, 0, numCols) + baseTypes := make([]string, numCols) + isArrayCol := make([]bool, numCols) + for i, esc := range columnsEscaped { + ci := escapedToInfo[esc] + bt, isArr := canonicalizePostgresType(ci.databaseTypeName) + baseTypes[i] = bt + isArrayCol[i] = isArr + if isArr { + arrayIdx = append(arrayIdx, i) + } else { + scalarIdx = append(scalarIdx, i) + } + } + + // If no scalar columns, bail out (let caller fallback) + if len(scalarIdx) == 0 { + return "", fmt.Errorf("no scalar columns for UNNEST WITH ORDINALITY") + } + + // Build unnest args for scalar columns and select projection + sAliases := make([]string, len(scalarIdx)+1) // +1 for ord + for i := 0; i < len(scalarIdx); i++ { + sAliases[i] = fmt.Sprintf("c%d", scalarIdx[i]) + } + sAliases[len(sAliases)-1] = "ord" + + sUnnestArgs := make([]string, len(scalarIdx)) + for k, colIdx := range scalarIdx { + bt := baseTypes[colIdx] + arr := fmt.Sprintf("ARRAY[%s]", strings.Join(colsToValues[colIdx], ",")) + if bt != "" { + arr = fmt.Sprintf("%s::%s[]", arr, bt) + } + sUnnestArgs[k] = arr + } + + // Build projection list for all columns in original order + projections := make([]string, numCols) + for i := 0; i < numCols; i++ { + if !isArrayCol[i] { + bt := baseTypes[i] + alias := fmt.Sprintf("s.c%d", i) + if bt != "" { + projections[i] = fmt.Sprintf("(%s)::%s", alias, bt) + } else { + projections[i] = alias + } + continue + } + // Array-typed: avoid 2D arrays; select the per-row array using CASE on ord + bt := baseTypes[i] + cases := make([]string, 0, len(perRowValues)+2) + cases = append(cases, "CASE ((s.ord)::int)") + for r := 0; r < len(perRowValues); r++ { + v := colsToValues[i][r] + var rowExpr string + if v == "NULL" { + rowExpr = fmt.Sprintf("NULL::%s[]", bt) + } else { + textLit := arrayExprToTextLiteral(strings.Trim(v, "'")) + rowExpr = fmt.Sprintf("(%s)::%s[]", escapeStringValue(textLit), bt) + } + cases = append(cases, fmt.Sprintf("WHEN %d THEN %s", r+1, rowExpr)) + } + cases = append(cases, fmt.Sprintf("ELSE '{}'::%s[] END", bt)) + projections[i] = strings.Join(cases, " ") + } + + selectList := strings.Join(projections, ",") + unnestArgs := strings.Join(sUnnestArgs, ", ") + aliasList := strings.Join(sAliases, ",") + return fmt.Sprintf("INSERT INTO %s (%s) SELECT %s FROM unnest(%s) WITH ORDINALITY AS s(%s);", + table.identifier, + strings.Join(columnsEscaped, ","), + selectList, + unnestArgs, + aliasList, + ), nil +} + +// buildUnnestUpsertSQL constructs an UNNEST-based upsert for a single table using escaped +// column names and per-row SQL-literal values. It builds per-column ARRAY[...] with type casts +// and emits ON CONFLICT over the table primary key columns, updating all provided columns. +func (d *PostgresDialect) buildUnnestUpsertSQL(table *TableInfo, columnsEscaped []string, perRowValues [][]string) (string, error) { + if len(columnsEscaped) == 0 || len(perRowValues) == 0 { + return "", fmt.Errorf("empty columns or rows for UNNEST upsert") + } + + // Build arrays per column from perRowValues + numCols := len(columnsEscaped) + + // Invert rows to columns + colsToValues := make([][]string, numCols) + for i := 0; i < numCols; i++ { + colsToValues[i] = make([]string, len(perRowValues)) + } + for r, row := range perRowValues { + if len(row) != numCols { + return "", fmt.Errorf("row %d has %d values, expected %d", r, len(row), numCols) + } + for c := 0; c < numCols; c++ { + colsToValues[c][r] = row[c] + } + } + + // Reverse-map escaped to ColumnInfo + escapedToInfo := make(map[string]*ColumnInfo, len(table.columnsByName)) + for _, ci := range table.columnsByName { + escapedToInfo[ci.escapedName] = ci + } + + // Partition columns + scalarIdx := make([]int, 0, numCols) + arrayIdx := make([]int, 0, numCols) + baseTypes := make([]string, numCols) + isArrayCol := make([]bool, numCols) + for i, esc := range columnsEscaped { + ci := escapedToInfo[esc] + bt, isArr := canonicalizePostgresType(ci.databaseTypeName) + baseTypes[i] = bt + isArrayCol[i] = isArr + if isArr { + arrayIdx = append(arrayIdx, i) + } else { + scalarIdx = append(scalarIdx, i) + } + } + + if len(scalarIdx) == 0 { + return "", fmt.Errorf("no scalar columns for UNNEST WITH ORDINALITY") + } + + // Unnest args and aliases for scalars + sAliases := make([]string, len(scalarIdx)+1) + for i := 0; i < len(scalarIdx); i++ { + sAliases[i] = fmt.Sprintf("c%d", scalarIdx[i]) + } + sAliases[len(sAliases)-1] = "ord" + + sUnnestArgs := make([]string, len(scalarIdx)) + for k, colIdx := range scalarIdx { + bt := baseTypes[colIdx] + arr := fmt.Sprintf("ARRAY[%s]", strings.Join(colsToValues[colIdx], ",")) + if bt != "" { + arr = fmt.Sprintf("%s::%s[]", arr, bt) + } + sUnnestArgs[k] = arr + } + + // Build projection list + projections := make([]string, numCols) + for i := 0; i < numCols; i++ { + if !isArrayCol[i] { + bt := baseTypes[i] + alias := fmt.Sprintf("s.c%d", i) + if bt != "" { + projections[i] = fmt.Sprintf("(%s)::%s", alias, bt) + } else { + projections[i] = alias + } + continue + } + // Array-typed: avoid 2D arrays; select the per-row array using CASE on ord + bt := baseTypes[i] + cases := make([]string, 0, len(perRowValues)+2) + cases = append(cases, "CASE ((s.ord)::int)") + for r := 0; r < len(perRowValues); r++ { + v := colsToValues[i][r] + var rowExpr string + if v == "NULL" { + rowExpr = fmt.Sprintf("NULL::%s[]", bt) + } else { + textLit := arrayExprToTextLiteral(strings.Trim(v, "'")) + rowExpr = fmt.Sprintf("(%s)::%s[]", escapeStringValue(textLit), bt) + } + cases = append(cases, fmt.Sprintf("WHEN %d THEN %s", r+1, rowExpr)) + } + cases = append(cases, fmt.Sprintf("ELSE '{}'::%s[] END", bt)) + projections[i] = strings.Join(cases, " ") + } + + // conflict target from table primary key columns + conflictCols := make([]string, len(table.primaryColumns)) + for i, pk := range table.primaryColumns { + conflictCols[i] = pk.escapedName + } + updates := make([]string, len(columnsEscaped)) + for i := range columnsEscaped { + updates[i] = fmt.Sprintf("%s=EXCLUDED.%s", columnsEscaped[i], columnsEscaped[i]) + } + + unnestArgs := strings.Join(sUnnestArgs, ", ") + selectList := strings.Join(projections, ",") + aliasList := strings.Join(sAliases, ",") + return fmt.Sprintf("INSERT INTO %s (%s) SELECT %s FROM unnest(%s) WITH ORDINALITY AS s(%s) ON CONFLICT (%s) DO UPDATE SET %s;", + table.identifier, + strings.Join(columnsEscaped, ","), + selectList, + unnestArgs, + aliasList, + strings.Join(conflictCols, ","), + strings.Join(updates, ", "), + ), nil +} + +// canonicalizePostgresType maps driver DatabaseTypeName (e.g., INT8, _INT8) +// to canonical Postgres base type (e.g., bigint) and whether the column itself +// is an array type. +func canonicalizePostgresType(databaseTypeName string) (baseType string, isArray bool) { + if databaseTypeName == "" { + return "", false + } + // Detect array-typed column from leading underscore (pq convention) + isArray = strings.HasPrefix(databaseTypeName, "_") + name := databaseTypeName + if isArray { + name = name[1:] + } + switch strings.ToUpper(name) { + case "INT8", "BIGINT": + baseType = "bigint" + case "INT4", "INTEGER", "INT": + baseType = "integer" + case "INT2", "SMALLINT": + baseType = "smallint" + case "BOOL", "BOOLEAN": + baseType = "boolean" + case "VARCHAR", "TEXT": + baseType = "varchar" + case "TIMESTAMP": + baseType = "timestamp" + case "TIMESTAMPTZ": + baseType = "timestamptz" + case "NUMERIC", "DECIMAL": + baseType = "numeric" + case "BYTEA": + baseType = "bytea" + default: + // Fallback to lowercase of provided name + baseType = strings.ToLower(name) + } + return baseType, isArray +} + +// buildInsertHistoryCTE builds a CTE that inserts one history row per INSERT operation in ops +// when those operations require reversible tracking (i.e., have a non-nil reversibleBlockNum). +// Returns the CTE SQL (without trailing semicolon) and a boolean indicating whether any history +// rows are needed. If none are needed, returns "", false. +func (d PostgresDialect) buildInsertHistoryCTE(schema string, ops []*Operation) (string, bool) { + if len(ops) == 0 { + return "", false + } + values := make([]string, 0, len(ops)) + // All ops in the batch target the same table + tableName := escapeStringValue(ops[0].table.identifier) + for _, op := range ops { + if op.reversibleBlockNum == nil { + continue + } + // op must be INSERT for this helper + pkJSON := escapeStringValue(primaryKeyToJSON(op.primaryKey)) + values = append(values, fmt.Sprintf("(%s,%s,%s,%d)", + escapeStringValue("I"), + tableName, + pkJSON, + *op.reversibleBlockNum, + )) + } + if len(values) == 0 { + return "", false + } + cte := fmt.Sprintf("WITH history_cte AS (INSERT INTO %s (op,table_name,pk,block_num) VALUES %s RETURNING 1)", + d.historyTable(schema), + strings.Join(values, ","), + ) + return cte, true +} + +// buildUpsertHistoryCTE builds a CTE that inserts one history row per UPSERT operation in ops +// for rows that require reversible tracking (have a non-nil reversibleBlockNum). It determines +// for each such row whether it would be an insert (no existing target row) or an update, and +// records prev_value via row_to_json(target) for updates. Returns the CTE SQL (without trailing +// semicolon) and a boolean indicating whether any history rows are needed. +func (d PostgresDialect) buildUpsertHistoryCTE(schema string, table *TableInfo, ops []*Operation) (string, bool) { + if len(ops) == 0 { + return "", false + } + + // Build VALUES rows for reversible ops only: (pk1, pk2, ..., pk_json, block_num) + pkCols := table.primaryColumns + if len(pkCols) == 0 { + return "", false + } + + // Build column alias list for src, quoting actual pk column names + srcCols := make([]string, 0, len(pkCols)+2) + for _, pk := range pkCols { + srcCols = append(srcCols, pk.escapedName) + } + srcCols = append(srcCols, "pk_json", "block_num") + + values := make([]string, 0, len(ops)) + + for _, op := range ops { + if op.reversibleBlockNum == nil { + continue + } + rowVals := make([]string, 0, len(pkCols)+2) + // pk values in declared order + for _, pk := range pkCols { + raw, ok := op.primaryKey[pk.name] + if !ok { + // if primary key missing, skip this row (safer than failing the whole batch) + rowVals = nil + break + } + norm, err := d.normalizeValueType(raw, pk.scanType) + if err != nil { + rowVals = nil + break + } + rowVals = append(rowVals, norm) + } + if rowVals == nil { + continue + } + // pk_json literal + rowVals = append(rowVals, escapeStringValue(primaryKeyToJSON(op.primaryKey))) + // block number literal + rowVals = append(rowVals, fmt.Sprintf("%d", *op.reversibleBlockNum)) + values = append(values, fmt.Sprintf("(%s)", strings.Join(rowVals, ","))) + } + + if len(values) == 0 { + return "", false + } + + // Build ON clause using pk columns + var onParts []string + for _, pk := range pkCols { + onParts = append(onParts, fmt.Sprintf("%s = src.%s", pk.escapedName, pk.escapedName)) + } + sort.Strings(onParts) + joinOn := strings.Join(onParts, " AND ") + + cte := fmt.Sprintf("WITH history_cte AS (WITH src(%s) AS (VALUES %s) INSERT INTO %s (op,table_name,pk,prev_value,block_num) SELECT CASE WHEN target.%s IS NULL THEN 'I' ELSE 'U' END AS op, %s, src.pk_json, CASE WHEN target.%s IS NULL THEN NULL ELSE row_to_json(target) END AS prev_value, src.block_num FROM %s AS src LEFT JOIN %s AS target ON %s RETURNING 1)", + strings.Join(srcCols, ","), + strings.Join(values, ","), + d.historyTable(schema), + pkCols[0].escapedName, + escapeStringValue(fmt.Sprintf("%s.%s", EscapeIdentifier(schema), table.nameEscaped)), + pkCols[0].escapedName, + "src", + table.identifier, + joinOn, + ) + return cte, true +} + +// computeInsertBatchPlan computes a stable, sorted superset of columns across the provided +// INSERT operations (which must all target the same table) and returns the escaped +// column names along with per-row values aligned to that column order. Missing +// fields are represented as SQL NULL. +func (d *PostgresDialect) computeInsertBatchPlan(ops []*Operation) (columnsEscaped []string, perRowValues [][]string, err error) { + if len(ops) == 0 { + return nil, nil, fmt.Errorf("empty operation slice for batch plan") + } + + table := ops[0].table + // Validate homogeneity: same table and INSERT type + for _, op := range ops { + if op.opType != OperationTypeInsert { + return nil, nil, fmt.Errorf("non-insert operation encountered in insert batch candidate") + } + if op.table != table { + return nil, nil, fmt.Errorf("mixed tables in insert batch candidate") + } + } + + // Build superset of raw column names across rows and ensure primary keys are included + rawNamesSet := make(map[string]struct{}) + for _, op := range ops { + for k := range op.data { + rawNamesSet[k] = struct{}{} + } + } + for _, pk := range table.primaryColumns { + rawNamesSet[pk.name] = struct{}{} + } + + rawNames := make([]string, 0, len(rawNamesSet)) + for name := range rawNamesSet { + // Validate column exists on table schema + if _, found := table.columnsByName[name]; !found { + return nil, nil, fmt.Errorf("unknown column %q for table %s", name, table.identifier) + } + rawNames = append(rawNames, name) + } + sort.Strings(rawNames) + + // Produce escaped column names and a lookup for ColumnInfo + columnsEscaped = make([]string, len(rawNames)) + colInfos := make([]*ColumnInfo, len(rawNames)) + for i, name := range rawNames { + info := table.columnsByName[name] + colInfos[i] = info + columnsEscaped[i] = info.escapedName + } + + // Build per-row values aligned with the stable column order + perRowValues = make([][]string, len(ops)) + for rowIdx, op := range ops { + row := make([]string, len(rawNames)) + for colIdx, name := range rawNames { + if v, ok := op.data[name]; ok { + normalized, nerr := d.normalizeValueType(v, colInfos[colIdx].scanType) + if nerr != nil { + return nil, nil, fmt.Errorf("normalize value for column %q: %w", name, nerr) + } + row[colIdx] = normalized + } else { + row[colIdx] = "NULL" + } + } + perRowValues[rowIdx] = row + } + + return columnsEscaped, perRowValues, nil +} + +// computeUpsertBatchPlan computes a stable column list for UPSERT batching. To preserve semantics +// of single-row upserts (only updating explicitly provided columns), this requires that all rows +// in the batch share the exact same set of data columns and that primary key columns are present +// in every row. Returns escaped column names and per-row normalized values aligned to that order. +func (d *PostgresDialect) computeUpsertBatchPlan(ops []*Operation) (columnsEscaped []string, perRowValues [][]string, err error) { + if len(ops) == 0 { + return nil, nil, fmt.Errorf("empty operation slice for upsert batch plan") + } + + table := ops[0].table + for _, op := range ops { + if op.opType != OperationTypeUpsert { + return nil, nil, fmt.Errorf("non-upsert operation encountered in upsert batch candidate") + } + if op.table != table { + return nil, nil, fmt.Errorf("mixed tables in upsert batch candidate") + } + } + + // Column set from first row (raw names) + firstSet := make(map[string]struct{}) + for k := range ops[0].data { + firstSet[k] = struct{}{} + } + // Ensure all primary key columns are present in data + for _, pk := range table.primaryColumns { + if _, ok := firstSet[pk.name]; !ok { + return nil, nil, fmt.Errorf("primary key column %q missing from upsert data", pk.name) + } + } + + // Validate all rows have identical column set + for _, op := range ops[1:] { + if len(op.data) != len(firstSet) { + return nil, nil, fmt.Errorf("heterogeneous columns across upsert batch") + } + for k := range op.data { + if _, ok := firstSet[k]; !ok { + return nil, nil, fmt.Errorf("heterogeneous columns across upsert batch") + } + } + } + + // Build stable ordered list + rawNames := make([]string, 0, len(firstSet)) + for name := range firstSet { + // Validate column exists on table schema + if _, found := table.columnsByName[name]; !found { + return nil, nil, fmt.Errorf("unknown column %q for table %s", name, table.identifier) + } + rawNames = append(rawNames, name) + } + sort.Strings(rawNames) + + // Produce escaped names and normalize values per row + columnsEscaped = make([]string, len(rawNames)) + colInfos := make([]*ColumnInfo, len(rawNames)) + for i, name := range rawNames { + info := table.columnsByName[name] + colInfos[i] = info + columnsEscaped[i] = info.escapedName + } + + perRowValues = make([][]string, len(ops)) + for rowIdx, op := range ops { + row := make([]string, len(rawNames)) + for colIdx, name := range rawNames { + v, ok := op.data[name] + if !ok { + return nil, nil, fmt.Errorf("unexpected missing column %q in upsert row", name) + } + normalized, nerr := d.normalizeValueType(v, colInfos[colIdx].scanType) + if nerr != nil { + return nil, nil, fmt.Errorf("normalize value for column %q: %w", name, nerr) + } + row[colIdx] = normalized + } + perRowValues[rowIdx] = row + } + + return columnsEscaped, perRowValues, nil +} + +// computeUpsertSupersetPlanWithPresence computes a superset of columns across the UPSERT batch, +// ensuring all primary key columns are present in every row's data map. It returns escaped column +// names, per-row normalized values aligned to that order, and a parallel per-row presence matrix +// indicating whether each column was explicitly present in the row's input (as opposed to being +// filled with NULL due to absence). Presence is used to preserve single-row semantics during +// UNNEST-based upserts. +func (d *PostgresDialect) computeUpsertSupersetPlanWithPresence(ops []*Operation) (columnsEscaped []string, perRowValues [][]string, perRowPresence [][]bool, err error) { + if len(ops) == 0 { + return nil, nil, nil, fmt.Errorf("empty operation slice for upsert superset plan") + } + + table := ops[0].table + for _, op := range ops { + if op.opType != OperationTypeUpsert { + return nil, nil, nil, fmt.Errorf("non-upsert operation encountered in upsert batch candidate") + } + if op.table != table { + return nil, nil, nil, fmt.Errorf("mixed tables in upsert batch candidate") + } + } + + // Build superset of raw column names across rows + rawNamesSet := make(map[string]struct{}) + for _, op := range ops { + for k := range op.data { + rawNamesSet[k] = struct{}{} + } + } + // Ensure primary key columns are included + for _, pk := range table.primaryColumns { + if _, ok := rawNamesSet[pk.name]; !ok { + return nil, nil, nil, fmt.Errorf("primary key column %q missing from upsert data", pk.name) + } + } + + rawNames := make([]string, 0, len(rawNamesSet)) + for name := range rawNamesSet { + if _, found := table.columnsByName[name]; !found { + return nil, nil, nil, fmt.Errorf("unknown column %q for table %s", name, table.identifier) + } + rawNames = append(rawNames, name) + } + sort.Strings(rawNames) + + // Produce escaped names and lookup + columnsEscaped = make([]string, len(rawNames)) + colInfos := make([]*ColumnInfo, len(rawNames)) + for i, name := range rawNames { + info := table.columnsByName[name] + colInfos[i] = info + columnsEscaped[i] = info.escapedName + } + + // Build per-row values and presence + perRowValues = make([][]string, len(ops)) + perRowPresence = make([][]bool, len(ops)) + for rowIdx, op := range ops { + rowVals := make([]string, len(rawNames)) + rowPres := make([]bool, len(rawNames)) + for colIdx, name := range rawNames { + if v, ok := op.data[name]; ok { + normalized, nerr := d.normalizeValueType(v, colInfos[colIdx].scanType) + if nerr != nil { + return nil, nil, nil, fmt.Errorf("normalize value for column %q: %w", name, nerr) + } + rowVals[colIdx] = normalized + rowPres[colIdx] = true + } else { + rowVals[colIdx] = "NULL" + rowPres[colIdx] = false + } + } + perRowValues[rowIdx] = rowVals + perRowPresence[rowIdx] = rowPres + } + + return columnsEscaped, perRowValues, perRowPresence, nil +} + +// buildUnnestUpsertSQLWithPresence constructs an UNNEST-based upsert using a superset of columns +// and per-column presence flags to preserve single-row semantics. For each column, we provide both +// values and presence (boolean). Presence indicates whether a field was explicitly provided; when +// absent, the DO UPDATE clause retains the target value. +func (d *PostgresDialect) buildUnnestUpsertSQLWithPresence(table *TableInfo, columnsEscaped []string, perRowValues [][]string, perRowPresence [][]bool) (string, error) { + if len(columnsEscaped) == 0 || len(perRowValues) == 0 { + return "", fmt.Errorf("empty columns or rows for UNNEST upsert (presence)") + } + + numCols := len(columnsEscaped) + // Invert rows to columns (values + presence) + colsToValues := make([][]string, numCols) + colsToPresence := make([][]string, numCols) + for i := 0; i < numCols; i++ { + colsToValues[i] = make([]string, len(perRowValues)) + colsToPresence[i] = make([]string, len(perRowValues)) + } + for r, row := range perRowValues { + if len(row) != numCols { + return "", fmt.Errorf("row %d has %d values, expected %d", r, len(row), numCols) + } + pres := perRowPresence[r] + if len(pres) != numCols { + return "", fmt.Errorf("row %d has %d presence flags, expected %d", r, len(pres), numCols) + } + for c := 0; c < numCols; c++ { + colsToValues[c][r] = row[c] + if pres[c] { + colsToPresence[c][r] = "TRUE" + } else { + colsToPresence[c][r] = "FALSE" + } + } + } + + // Reverse-map escaped to ColumnInfo and determine base types + escapedToInfo := make(map[string]*ColumnInfo, len(table.columnsByName)) + for _, ci := range table.columnsByName { + escapedToInfo[ci.escapedName] = ci + } + baseTypes := make([]string, numCols) + isArrayCol := make([]bool, numCols) + for i, esc := range columnsEscaped { + ci := escapedToInfo[esc] + bt, isArr := canonicalizePostgresType(ci.databaseTypeName) + baseTypes[i] = bt + isArrayCol[i] = isArr + } + + // Build UNNEST args and aliases: for each column i + // - scalar: value array (typed) + presence array (::boolean[]) + // - array: presence array only (value via CASE on ord) + sUnnestArgs := make([]string, 0, numCols*2) + sAliases := make([]string, 0, numCols*2+1) + for i := 0; i < numCols; i++ { + // presence arg for all columns + pArr := fmt.Sprintf("ARRAY[%s]::boolean[]", strings.Join(colsToPresence[i], ",")) + // For scalars, also include value arg + if !isArrayCol[i] { + vArr := fmt.Sprintf("ARRAY[%s]", strings.Join(colsToValues[i], ",")) + if bt := baseTypes[i]; bt != "" { + vArr = fmt.Sprintf("%s::%s[]", vArr, bt) + } + sUnnestArgs = append(sUnnestArgs, vArr, pArr) + sAliases = append(sAliases, fmt.Sprintf("v%d", i), fmt.Sprintf("p%d", i)) + } else { + sUnnestArgs = append(sUnnestArgs, pArr) + sAliases = append(sAliases, fmt.Sprintf("p%d", i)) + } + } + sAliases = append(sAliases, "ord") + + // Build projection list for src CTE: for each column i, output value and presence + // Value expression (11c hardened): + // - scalar: CASE WHEN present THEN typed(s.v{i}) ELSE default_expr_or_null END + // - array: CASE by ord yielding typed array; wrapped with presence to use default/empty on absence + projections := make([]string, 0, numCols*2) + cteCols := make([]string, 0, numCols*2) + for i := 0; i < numCols; i++ { + // value expr with default inlining for absent values + var valExpr string + colInfo := escapedToInfo[columnsEscaped[i]] + if colInfo == nil { + return "", fmt.Errorf("missing ColumnInfo for %s", columnsEscaped[i]) + } + defaultScalar := "NULL" + if colInfo.hasDefault && colInfo.defaultExpr != "" { + defaultScalar = colInfo.defaultExpr + } + if !isArrayCol[i] { + alias := fmt.Sprintf("s.v%d", i) + var typed string + if bt := baseTypes[i]; bt != "" { + typed = fmt.Sprintf("(%s)::%s", alias, bt) + } else { + typed = alias + } + valExpr = fmt.Sprintf("CASE WHEN s.p%d THEN %s ELSE %s END", i, typed, defaultScalar) + } else { + bt := baseTypes[i] + cases := make([]string, 0, len(perRowValues)+3) + cases = append(cases, "CASE ((s.ord)::int)") + for r := 0; r < len(perRowValues); r++ { + v := colsToValues[i][r] + var rowExpr string + if v == "NULL" { + rowExpr = fmt.Sprintf("NULL::%s[]", bt) + } else { + textLit := arrayExprToTextLiteral(strings.Trim(v, "'")) + rowExpr = fmt.Sprintf("(%s)::%s[]", escapeStringValue(textLit), bt) + } + cases = append(cases, fmt.Sprintf("WHEN %d THEN %s", r+1, rowExpr)) + } + elseExpr := fmt.Sprintf("'{}'::%s[]", bt) + if colInfo.hasDefault && colInfo.defaultExpr != "" { + elseExpr = fmt.Sprintf("(%s)::%s[]", colInfo.defaultExpr, bt) + } + cases = append(cases, fmt.Sprintf("ELSE %s END", elseExpr)) + valExpr = fmt.Sprintf("CASE WHEN s.p%d THEN (%s) ELSE %s END", i, strings.Join(cases, " "), elseExpr) + } + projections = append(projections, valExpr) + cteCols = append(cteCols, fmt.Sprintf("c%d", i)) + // presence expr + projections = append(projections, fmt.Sprintf("s.p%d", i)) + cteCols = append(cteCols, fmt.Sprintf("c%dp", i)) + } + + // Conflict target and update expressions using presence + conflictCols := make([]string, len(table.primaryColumns)) + for i, pk := range table.primaryColumns { + conflictCols[i] = pk.escapedName + } + updates := make([]string, numCols) + for i := 0; i < numCols; i++ { + updates[i] = fmt.Sprintf("%s=CASE WHEN src.c%dp THEN EXCLUDED.%s ELSE %s.%s END", columnsEscaped[i], i, columnsEscaped[i], table.identifier, columnsEscaped[i]) + } + + // Map column index by escaped name for PK join expression + indexByEsc := make(map[string]int, numCols) + for i, esc := range columnsEscaped { + indexByEsc[esc] = i + } + // Build scalar subquery predicate: src.pk = EXCLUDED.pk + var joinOnExcluded []string + for _, pk := range table.primaryColumns { + idx, ok := indexByEsc[pk.escapedName] + if !ok { + return "", fmt.Errorf("primary key %s not found in columnsEscaped", pk.escapedName) + } + joinOnExcluded = append(joinOnExcluded, fmt.Sprintf("src.c%d = EXCLUDED.%s", idx, pk.escapedName)) + } + sort.Strings(joinOnExcluded) + joinOnExcludedExpr := strings.Join(joinOnExcluded, " AND ") + // Build assignments using scalar subquery for presence + updates = make([]string, numCols) + for i := 0; i < numCols; i++ { + presenceSubq := fmt.Sprintf("(SELECT src.c%dp FROM src WHERE %s LIMIT 1)", i, joinOnExcludedExpr) + updates[i] = fmt.Sprintf("%s=CASE WHEN %s THEN EXCLUDED.%s ELSE %s.%s END", columnsEscaped[i], presenceSubq, columnsEscaped[i], table.identifier, columnsEscaped[i]) + } + + // Guard: refuse UNNEST superset when a NOT NULL column without default is absent in any row + for i := 0; i < numCols; i++ { + ci := escapedToInfo[columnsEscaped[i]] + if ci != nil && !ci.nullable && !ci.hasDefault { + for _, p := range colsToPresence[i] { + if p == "FALSE" { + return "", fmt.Errorf("unsafe UNNEST superset: column %s is NOT NULL without default and some rows omit it", columnsEscaped[i]) + } + } + } + } + + // Assemble SQL + selectList := strings.Join(projections, ",") + aliasList := strings.Join(sAliases, ",") + cteColsList := strings.Join(cteCols, ",") + unnestArgs := strings.Join(sUnnestArgs, ", ") + + insertTargets := strings.Join(columnsEscaped, ",") + selectValues := make([]string, numCols) + for i := 0; i < numCols; i++ { + selectValues[i] = fmt.Sprintf("src.c%d", i) + } + + return fmt.Sprintf("WITH src(%s) AS (SELECT %s FROM unnest(%s) WITH ORDINALITY AS s(%s)) INSERT INTO %s (%s) SELECT %s FROM src ON CONFLICT (%s) DO UPDATE SET %s;", + cteColsList, + selectList, + unnestArgs, + aliasList, + table.identifier, + insertTargets, + strings.Join(selectValues, ","), + strings.Join(conflictCols, ","), + strings.Join(updates, ", "), + ), nil +} + +// buildValuesUpsertSQL constructs a multi-row VALUES upsert for a single table using the provided +// escaped column names and per-row values. It appends an ON CONFLICT clause using the table's +// primary key columns and updates all provided columns to EXCLUDED values. +func (d *PostgresDialect) buildValuesUpsertSQL(table *TableInfo, columnsEscaped []string, perRowValues [][]string) string { + valuesParts := make([]string, len(perRowValues)) + for i := range perRowValues { + valuesParts[i] = fmt.Sprintf("(%s)", strings.Join(perRowValues[i], ",")) + } + // conflict target from table primary key columns + conflictCols := make([]string, len(table.primaryColumns)) + for i, pk := range table.primaryColumns { + conflictCols[i] = pk.escapedName + } + updates := make([]string, len(columnsEscaped)) + for i := range columnsEscaped { + updates[i] = fmt.Sprintf("%s=EXCLUDED.%s", columnsEscaped[i], columnsEscaped[i]) + } + return fmt.Sprintf("INSERT INTO %s (%s) VALUES %s ON CONFLICT (%s) DO UPDATE SET %s;", + table.identifier, + strings.Join(columnsEscaped, ","), + strings.Join(valuesParts, ","), + strings.Join(conflictCols, ","), + strings.Join(updates, ", "), + ) +} + func getPrimaryKeyFakeEmptyValues(primaryKey map[string]string) string { if len(primaryKey) == 1 { for key := range primaryKey { diff --git a/db_changes/db/dialect_postgres_batch.prd.md b/db_changes/db/dialect_postgres_batch.prd.md new file mode 100644 index 0000000..fc80eb2 --- /dev/null +++ b/db_changes/db/dialect_postgres_batch.prd.md @@ -0,0 +1,361 @@ +--- +description: Enable optional INSERT-ONLY batched ingestion for Postgres (VALUES/UNNEST), preserving safety (ordering, history) and offering measurable ingestion speedups. +globs: *.prd.md +alwaysApply: false +--- + +### Context & Goal + +The current Postgres dialect applies one operation at a time in strict `ordinal` order, logging a 1:1 history row per mutation (with `prev_value` JSON) and supporting mixed operation types (INSERT/UPSERT/UPDATE/DELETE) and arbitrary per-row column sets. This prevents using multi-row INSERT batching by default. + +Goal: introduce an optional INSERT-ONLY batching mode for Postgres that safely batches inserts when conditions are met, with a choice of implementations (multi-row `VALUES` or `SELECT FROM unnest(...)`) and a guard-rail fallback to the existing single-row path. We will measure performance impact and document trade-offs. + +Reference: TigerData article on UNNEST-based batching performance improvements in Postgres [Boosting Postgres INSERT Performance by 2x With UNNEST](https://www.tigerdata.com/blog/boosting-postgres-insert-performance). + +### Non-Goals + +- Changing ClickHouse batching behavior (serves as prior art only). +- Changing history/reorg semantics globally; batching applies only when explicitly enabled and safe. + +### Constraints & Rationale (why batching isn’t default today) + +- Mixed operation types require global ordering across INSERT/UPSERT/UPDATE/DELETE. +- Rows can carry arbitrary column subsets; single batched statement needs a shared column list. +- History table stores per-op `prev_value` JSON; we must log per-row history before mutation. +- Reorg safety depends on 1:1 mapping between mutation and history entry in correct order. + +### Rollout & Config + +- Add flags (subject to review): + - `--pg-insert-batch-mode=off|values|unnest` (default `off`). + - `--pg-insert-batch-size=` (default `1000`). + - `--pg-insert-only=true|false` (default `false`). When `true`, sink will reject non-insert ops or auto-fallback to single-row for those ops. +- Batching activates only if: + - Current flush window contains only INSERT operations for a table segment chosen for batching. + - All rows in the batch can share a stable column set (either identical or we can safely expand to a superset with defaults/nulls). + - History logging can be emitted per row (e.g., via CTE) prior to insert. +- Fallback: if any check fails, use existing single-row path transparently. + +--- + +## Tasks + +- [x] 1. Add CLI flags for Postgres batching + - Definition: + - Add `--pg-insert-batch-mode`, `--pg-insert-batch-size`, `--pg-insert-only` flags and plumb them into config/dialect wiring. + - Relevant files: `cmd/substreams-sink-sql/common_flags.go`, `cmd/substreams-sink-sql/run.go`, `db_changes/db/db.go` (Loader wiring). Confidence: high these are sufficient. + - Progress: + - Expected work: Added runtime-only flags in `cmd/substreams-sink-sql/run.go` — `--pg-insert-batch-mode` (off|values|unnest), `--pg-insert-batch-size` (default 1000), `--pg-insert-only` (bool). Intentional: keep flags local to run command. + - Unexpected skipped work: Did not add flags to `common_flags.go` or other commands (e.g., `from-proto`, `tools`) by design; plumbing into Loader/dialect deferred to Task 2. + - Unexpected extra work: Ran linter checks; no issues reported for modified file. + - Learnings: Runtime flags belong in `run.go`; wiring typically flows `run.go` → `db_changes/sinker/factory.go` → `db.NewLoader(...)` → dialect. Task 2 should include `db_changes/sinker/factory.go` in relevant files. + - Deliberate tech debt: Flags not yet propagated to Loader/dialect; no tests/docs for new flags yet; DSN-based config (optional) postponed. + +- [x] 2. Surface batching settings to Postgres dialect + - Definition: + - Extend `PostgresDialect`/`Loader` to expose batching settings to the dialect. Prefer resolving at flush-time from `Loader` to avoid constructor churn; alternatively, pass via `newDialect`. + - Relevant files: `db_changes/db/db.go`, `db_changes/db/dialect_postgres.go`, `db_changes/sinker/factory.go` (for wiring in a later step). Confidence: high. + - Progress: + - Expected work: Added `Loader` fields/getters `PgInsertBatchMode()`, `PgInsertBatchSize()`, `PgInsertOnly()` in `db.go`; added `PgBatchMode` enum and helpers `effectivePgBatchMode`, `effectivePgBatchSize`, `isPgInsertOnly` in `dialect_postgres.go` to read settings at flush-time. + - Unexpected skipped work: Did not modify `NewPostgresDialect`/`newDialect` signatures; decided to resolve config from `Loader` during `Flush` instead of passing via constructor. + - Unexpected extra work: None. + - Learnings: Keeping dialect construction stable reduces blast radius; `Loader` already flows into `Flush`, so accessing runtime flags via `Loader` is cleaner. + - Deliberate tech debt: Flags are not yet assigned to `Loader` fields (will be wired via `db_changes/sinker/factory.go` and `run.go` in the next step). No tests yet for helpers. + +- [x] 2a. Wire runtime flags into Loader via sinker factory + - Definition: + - Read flags in `run.go`, extend `db_changes/sinker/factory.go` options to carry `pg-insert-*` settings, and assign `Loader.pgInsertBatchMode`, `Loader.pgInsertBatchSize`, `Loader.pgInsertOnly` when constructing the loader. + - Relevant files: `cmd/substreams-sink-sql/run.go`, `db_changes/sinker/factory.go`, `db_changes/db/db.go`. Confidence: high. + - Progress: + - Expected work: Added fields to factory options; read flags in `run.go`; pass settings to factory; call `ConfigurePgInsertBatching` on `Loader` in factory. + - Unexpected skipped work: None. + - Unexpected extra work: None. + - Learnings: Centralizing config assignment in the factory keeps `NewLoader` stable and minimizes CLI coupling in db layer. + - Deliberate tech debt: No unit tests yet validating option flow; DSN fallback still deferred. + +- [x] 3. Detect INSERT-ONLY batches during flush + - Definition: + - In `Flush`, detect spans of operations that are all INSERTs for the same table and within batch size; otherwise use single-row path. + - Relevant files: `db_changes/db/dialect_postgres.go`. Confidence: high. + - Progress: + - Expected work: Added scan of sorted operations in `Flush` to find contiguous INSERT-only spans per table up to configured batch size; emits debug logs when tracer is enabled, leaving execution on single-row path for now. + - Unexpected skipped work: None. + - Unexpected extra work: Guarded pathological batch size (<2) and ensured spans don't overlap in logs. + - Learnings: Candidate detection is cheap and safe to run under tracer; sets the stage for VALUES/UNNEST builders. + - Deliberate tech debt: No SQL generation yet; no metrics counters beyond debug logs. + +- [x] 4. Compute stable column set (or superset) for a batch + - Definition: + - Determine shared column list across rows; if rows differ, either (a) fill missing columns with safe defaults/nulls or (b) split into compatible sub-batches. + - Relevant files: `db_changes/db/dialect_postgres.go`. Confidence: medium (edge cases on types/defaults). + - Progress: + - Expected work: Added `computeInsertBatchPlan` to build a sorted column superset (ensuring PKs included), return escaped columns and per-row aligned values, filling missing fields with `NULL`. + - Unexpected skipped work: None. + - Unexpected extra work: Integrated helper in batch detection to log plan readiness under tracer; validated columns exist on table schema. + - Learnings: Using table `scanType` with existing `normalizeValueType` ensures consistent SQL literal formatting. + - Deliberate tech debt: Does not yet split into sub-batches on type incompatibilities; relies on upcoming builders to enforce stricter rules. + +- [x] 5. Implement batched INSERT using VALUES (...), (...) + - Definition: + - Build a single `INSERT INTO tbl(cols) VALUES (...), (...), ...)` statement for the batch using normalized values; preserve per-row order within batch. + - Relevant files: `db_changes/db/dialect_postgres.go`. Confidence: high. + - Progress: + - Expected work: Added `buildValuesInsertSQL` and refactored `Flush` to emit ordered execution segments, forming VALUES batches when mode=values, rows are contiguous INSERTs for the same table, within batch size, and all irreversible. + - Unexpected skipped work: Skipped reversible rows for now to preserve per-row history guarantees; they fall back to single-row path. + - Unexpected extra work: Added tracer logs for batch execution and candidate skips. + - Learnings: Segment builder keeps global ordering intact while enabling batched execution opportunistically. + - Deliberate tech debt: No CTE-based per-row history emission yet; UNNEST mode not implemented. + +- [x] 6. Implement batched INSERT using UNNEST arrays (optional mode) + - Definition: + - Build `INSERT INTO tbl(cols) SELECT * FROM unnest($1::type[], ...)` with arrays per column, matching row order; param binding or literal arrays as appropriate. + - Relevant files: `db_changes/db/dialect_postgres.go`. Confidence: medium (types/arrays casting). + - Progress: + - Expected work: Added `buildUnnestInsertSQL` to generate per-column ARRAY[...] with type casts from `ColumnInfo.databaseTypeName`, and extended `Flush` to form UNNEST batches when mode=unnest with irreversible rows. + - Unexpected skipped work: Skipped parameterized arrays; using literal arrays for simplicity; relies on proper escaping already in normalized values. + - Unexpected extra work: Reverse-mapped escaped column names to `ColumnInfo` to obtain types. + - Learnings: Casting arrays to `[]` avoids implicit cast pitfalls across columns. + - Deliberate tech debt: No CTE history yet; no param binding; minimal validation on type name compatibility. + +- [x] 7. Emit per-row history entries for batched insert (CTE) + - Definition: + - Use a CTE to insert history rows per row with `op='I'`, `pk`, `prev_value=NULL`, `block_num`, then perform the batched insert; maintain correct ordering. + - Relevant files: `db_changes/db/dialect_postgres.go`. Confidence: medium. + - Progress: + - Expected work: Added `buildInsertHistoryCTE` that constructs a `WITH` CTE inserting one history row per reversible INSERT in the batch, and prepends it to VALUES/UNNEST insert when needed. + - Unexpected skipped work: `prev_value` is omitted (NULL) for inserts, consistent with single-row path; ordering preserved by executing CTE before batched insert. + - Unexpected extra work: Ensured table name and PK JSON are escaped safely; only includes rows with non-nil reversible block numbers. + - Learnings: Using a single CTE keeps atomicity and ordering with minimal SQL overhead. + - Deliberate tech debt: Metrics not added yet; complex interleaving scenarios handled by prior segmentation. + +- [x] 8. Global ordering guarantees across batches + - Definition: + - Ensure batching never crosses ordinals where other ops (UPSERT/UPDATE/DELETE) interleave; preserve original `ordinal` sequence semantically. + - Relevant files: `db_changes/db/dialect_postgres.go`. Confidence: high. + - Progress: + - Expected work: Segment builder scans sorted ops and only batches contiguous INSERTs for the same table; other ops force segment boundaries, preserving order. + - Unexpected skipped work: None. + - Unexpected extra work: Added tracer logs to make segment boundaries observable. + - Learnings: Building segments first simplifies order guarantees and later SQL generation. + - Deliberate tech debt: None. + +- [x] 9. Safe fallback to single-row path + - Definition: + - If any precondition fails (mixed ops, incompatible columns, type cast issues), fall back to existing per-row queries without failing the flush. + - Relevant files: `db_changes/db/dialect_postgres.go`. Confidence: high. + - Progress: + - Expected work: Wrapped batch execution with SAVEPOINT; on error, rollback to savepoint and execute per-row statements for that segment. + - Unexpected skipped work: If SAVEPOINT unsupported, we surface the error (transaction aborts) instead of manual per-row replay. + - Unexpected extra work: Added debug logs on fallback path for diagnosability. + - Learnings: SAVEPOINT provides a clean rollback boundary without affecting prior segments. + - Deliberate tech debt: No metrics yet for fallback occurrences. + +- [x] 10. Batched UPSERT using VALUES (...) + - Definition: + - Extend detection to form contiguous UPSERT-only spans per table. Build `INSERT INTO tbl(cols) VALUES (...), (...) ON CONFLICT (pk...) DO UPDATE SET col=EXCLUDED.col`. + - Implement per-row history via a CTE: derive op ('I' | 'U') and `prev_value` by left-joining existing rows using the PK over a `src` CTE of the batch rows. + - Preserve global ordering; fallback to single-row path on any failure. + - Relevant files: `db_changes/db/dialect_postgres.go`. Confidence: medium-high. + - Progress: + - Expected work: Implemented detection in `Flush` for contiguous UPSERT-only spans per table when `mode=values` and not insert-only; added `computeUpsertBatchPlan` enforcing identical column sets and PK presence; added `buildValuesUpsertSQL` generating `INSERT ... VALUES ... ON CONFLICT (pk...) DO UPDATE SET col=EXCLUDED.col`; added `buildUpsertHistoryCTE` to derive per-row op and `prev_value` via a `src` CTE and LEFT JOIN to target; prepends CTE when reversible rows are present; executes batches under SAVEPOINT with per-row fallback on error. + - Unexpected skipped work: UNNEST-based UPSERT (handled in Task 11) and parameterized arrays not implemented; heterogeneous column sets for UPSERT are not supported (by design, identical sets required). + - Unexpected extra work: Aligned `saveUpsert` to accept an escaped table name and updated call sites to match; enhanced tracer logs for UPSERT batch detection and execution. + - Learnings: Enforcing identical column sets preserves single-row UPSERT semantics (only explicitly provided columns are updated) and simplifies deterministic SQL; a single CTE keeps history emission atomic and ordered with minimal overhead. + - Deliberate tech debt: Metrics/counters (Task 15), unit/integration tests (Tasks 12–13), documentation (Task 16), and UNNEST UPSERT (Task 11) remain pending. + +- [x] 11. Batched UPSERT using UNNEST arrays + - Definition: + - Build `INSERT INTO tbl(cols) SELECT * FROM unnest(col1_arr::type[], ...) ON CONFLICT (pk...) DO UPDATE SET col=EXCLUDED.col` using per-column arrays. + - Create a `src` CTE from UNNEST to compute history entries (op and `prev_value`) with a left join to existing table rows; prepend a history CTE before the insert. + - Ensure correct type casts and row order; fallback safely. + - Relevant files: `db_changes/db/dialect_postgres.go`. Confidence: medium. + - Progress: + - Expected work: Implemented `buildUnnestUpsertSQL` to construct per-column `ARRAY[...]` with `[]` casts via `ColumnInfo.databaseTypeName`, generating `INSERT INTO ... SELECT * FROM unnest(...) ON CONFLICT (pk...) DO UPDATE SET col=EXCLUDED.col`; extended `Flush` to detect contiguous UPSERT-only spans for a table when `mode=unnest`, reuse `computeUpsertBatchPlan` to validate identical column sets and normalize values, and prepend `buildUpsertHistoryCTE` when reversible rows are present; execute under SAVEPOINT with per-row fallback on failure. + - Unexpected skipped work: Parameterized arrays are not implemented (literal arrays used); heterogeneous column sets for UPSERT remain unsupported by design. + - Unexpected extra work: Added UNNEST UPSERT detection logs and error diagnostics when SQL build fails; ensured conflict target uses table PKs. + - Learnings: Casting arrays to concrete `[]` avoids implicit cast ambiguity and keeps UNNEST robust across column types; sharing the UPSERT plan logic across VALUES and UNNEST keeps semantics consistent. + - Deliberate tech debt: Metrics (Task 15), tests (Tasks 12–13), and documentation (Task 16) still pending; parameterization of arrays left for a later task (Task 17). + +- [x] 11a. UNNEST arrays via WITH ORDINALITY + projection indexing + - Definition: + - For tables with array-typed columns, avoid flattening these columns. Unnest only scalar columns WITH ORDINALITY to produce an `ord` per row, keep array-typed columns as 2D arrays, and project a per-row 1D array using `arr_col[ord]::type[]` in the SELECT list. + - Ensure explicit casts use resolved Postgres types (including enums/domains) from table metadata; maintain row order and lengths across all arrays; guard on length mismatches. + - Provide a safety fallback: when typed projection cannot be constructed safely, skip UNNEST for that batch and use VALUES. + - Consider an alternative interim path: pass array-typed columns as `text[]` of brace literals and cast in projection to `type[]` when WITH ORDINALITY is not viable. + - Relevant files: `db_changes/db/dialect_postgres.go`. Confidence: medium-high. + - Progress: + - Expected work: Finalized UNNEST handling for array-typed columns by selecting per-row arrays via CASE over `((s.ord)::int)` instead of 2D arrays + indexing. Updated both builders: `buildUnnestInsertSQL` and `buildUnnestUpsertSQL` in `db_changes/db/dialect_postgres.go`. + - Unexpected skipped work: Abandoned the 2D array projection path (`arr2d[s.ord]`) due to persistent cast issues; rectangular-length enforcement no longer required. + - Unexpected extra work: Added temporary rectangularity guards and an all-empty special-case during investigation; retained VALUES fallback in `Flush` for any UNNEST build errors. Removed unused `textArrayLiteralLength` helper. + - Learnings: Even with explicit parentheses and `(s.ord)::int`, Postgres could treat the result of `arr2d[s.ord]` as a scalar in some plans, causing bigint→bigint[] cast errors. CASE-by-ordinal eliminates 2D typing/indexing pitfalls while keeping UNNEST performance for scalar columns. Reference: [PostgreSQL docs: Table Expressions, WITH ORDINALITY](https://www.postgresql.org/docs/current/queries-table-expressions.html). + - Deliberate tech debt: CASE projections grow with batch size; consider a VALUES CTE keyed by `ord` to reduce SQL size. Observability (Task 15) and tests/benchmarks (Tasks 12–14) remain. + +- [x] 11b. UPSERT UNNEST with heterogeneous columns via superset + presence flags + - Definition: + - For a contiguous UPSERT span, compute a sorted superset of columns across rows. For each column, produce two arrays: values (normalized SQL literals, using NULL when absent) and presence flags (boolean). + - Build UNNEST over scalar columns WITH ORDINALITY as today. In projection, include each column value and its presence flag; for array-typed columns, keep the CASE-by-ordinal approach and add a parallel presence flag. + - In `ON CONFLICT DO UPDATE`, preserve single-row semantics by setting each column as: `col = CASE WHEN col_present THEN EXCLUDED.col ELSE target.col END` so that absent columns do not modify existing values, while explicit NULLs still update to NULL. + - Derive presence from row membership (whether the field was provided) rather than from value NULL-ness. + - History CTE via `buildUpsertHistoryCTE` remains unchanged. + - Relevant files: `db_changes/db/dialect_postgres.go`. Confidence: medium-high. + - Progress: + - Expected work: Implemented `computeUpsertSupersetPlanWithPresence` and `buildUnnestUpsertSQLWithPresence`; integrated in `Flush` for `mode=unnest` UPSERT spans. Presence flags control DO UPDATE to preserve single-row semantics. Array-typed columns use CASE-by-ordinal projections. + - Unexpected skipped work: INSERT path currently uses the superset directly; absent fields become SQL NULL on brand-new rows, bypassing table defaults and potentially violating NOT NULL constraints (e.g., `status`). No catalog-aware default inlining or NOT NULL guard yet. + - Unexpected extra work: Added presence matrix plumbing and CASE-by-ordinal logic for arrays; expanded logs for detection/execution.an + - Learnings: Presence is sufficient for DO UPDATE, but INSERT requires either default inlining or partitioning to preserve omission semantics. See Tasks 11c and 11d. + - Deliberate tech debt: Missing default inlining/NOT NULL safeguards; no fallback to VALUES when unsafe; metrics/tests pending. + +- [x] 11c. Harden superset INSERT semantics by inlining defaults for absent values + - Definition: + - Read column nullability and default expressions (e.g., from `pg_catalog`). In UNNEST/VALUES projection for INSERT, use `CASE WHEN present THEN value ELSE default_expr END` per column so that omitted fields receive table defaults. Preserve explicit NULL updates when presence=true and value=NULL. For NOT NULL columns without defaults, do not UNNEST the batch (fallback to VALUES with identical column sets or per-row). + - Apply to both scalar and array-typed columns: keep CASE-by-ordinal for arrays and emit default_expr in the ELSE branch per row. Consider a feature flag to enable default inlining due to volatile defaults (e.g., `now()`, `nextval`). + - Relevant files: `db_changes/db/dialect_postgres.go` (UNNEST/VALUES builders), table metadata loading (extend `TableInfo` with nullability/defaults or fetch lazily/cached). + - Confidence: medium-high. + - Progress: + - Expected work: Implemented. Added `fetchColumnNullabilityDefaults` in `db_changes/db/db.go` to read nullability and `pg_get_expr` defaults; extended `ColumnInfo` with `nullable`, `hasDefault`, `defaultExpr`. Updated `buildUnnestUpsertSQLWithPresence` to inline defaults on INSERT: scalars use `CASE WHEN present THEN typed(value) ELSE default END`; arrays use CASE-by-ordinal wrapped with presence and default to `default_expr::type[]` or empty array. + - Unexpected skipped work: Did not add a VALUES-mode default inlining (not strictly required); parameterized arrays still deferred (Task 17). + - Unexpected extra work: Added a safety guard: if any NOT NULL column has no default and is absent in any row, the builder refuses UNNEST superset (error) so `Flush` falls back to VALUES or per-row. + - Learnings: Guarding before SQL assembly avoids NOT NULL violations and preserves default semantics for brand-new rows; presence remains correct for DO UPDATE. + - Deliberate tech debt: VALUES-mode default inlining and metrics/tests remain pending. + +- [ ] 11d. Optional: Partition UNNEST batches by identical column sets + - Definition: + - Partition a heterogeneous UPSERT span into sub-batches where all rows share the exact same present column set. For each sub-batch, omit absent columns from the INSERT target so table defaults apply naturally; keep UNNEST for the sub-batch’s columns only. Maintain global ordering by emitting multiple segments. Fallback to VALUES/single when partitions would be too small to justify UNNEST. + - Relevant files: `db_changes/db/dialect_postgres.go` (segment builder and UNNEST planner). + - Confidence: high. + - Progress: + - Expected work: + - Unexpected skipped work: + - Unexpected extra work: + - Learnings: + - Deliberate tech debt: + +- [ ] 12. Unit tests for SQL generation and column normalization + - Definition: + - Cover VALUES and UNNEST builders, column superset logic, and value normalization; verify stable, deterministic SQL. + - Relevant files: `db_changes/db/dialect_postgres.go` (tests alongside), new `_test.go` files. Confidence: medium-high. + - Progress: + - Expected work: + - Unexpected skipped work: + - Unexpected extra work: + - Learnings: + - Deliberate tech debt: + +- [x] 12a. Unit tests: UNNEST INSERT builder + - Definition: + - Verify `buildUnnestInsertSQL` for mixed scalar and array-typed columns: + - Scalar columns become typed `ARRAY[...]::type[]` and participate in `WITH ORDINALITY`. + - Array-typed columns are projected via `CASE ((s.ord)::int)` with per-row typed arrays and `ELSE '{}'::type[]` fallback. + - Error when there are no scalar columns (guarded path). + - NULL arrays render as `NULL::type[]` in CASE arms. + - Relevant files: `db_changes/db/dialect_postgres.go` (UNNEST insert builder), `db_changes/db/dialect_postgres_test.go`. + - Confidence: high. + - Progress: + - Expected work: Added `Test_buildUnnestInsertSQL_MixedScalarAndArray` and `Test_buildUnnestInsertSQL_NoScalarColumns_Error` to validate mixed scalar/array handling, typed scalar arrays with `WITH ORDINALITY`, CASE-by-ordinal projections for arrays, empty-array fallback, and the no-scalar error path. + - Unexpected skipped work: None. + - Unexpected extra work: Introduced `mkTestTable` helper for concise table setup; assertions use fragment matching to avoid brittle full-SQL equality. + - Learnings: `WITH ORDINALITY` requires at least one scalar column; array-typed columns project via CASE-by-ordinal with typed casts and `'{}'::type[]` fallback; NULL arrays must render as `NULL::type[]` within CASE arms. + - Deliberate tech debt: No negative tests for malformed row/column lengths; VALUES-mode default inlining still untested; no benchmarking of SQL size. + +- [x] 12b. Unit tests: UNNEST UPSERT with presence + - Definition: + - Verify `buildUnnestUpsertSQLWithPresence` behavior: + - Builds per-column `boolean[]` presence alongside typed value arrays for scalars; arrays use CASE-by-ordinal. + - `ON CONFLICT ... DO UPDATE` uses presence to conditionally assign `EXCLUDED.col` or retain target value. + - Guard: error when any NOT NULL column without a default is absent in any row. + - Default inlining for INSERT value projection when presence=false (scalar and array columns). + - Relevant files: `db_changes/db/dialect_postgres.go`, `db_changes/db/dialect_postgres_test.go`. + - Confidence: medium-high. + - Progress: + - Expected work: Added `Test_buildUnnestUpsertSQLWithPresence_Basics` (presence arrays, typed scalar arrays, `WITH ORDINALITY`, presence-controlled `ON CONFLICT ... DO UPDATE`) and `Test_buildUnnestUpsertSQLWithPresence_DefaultInlining` (default inlining for scalars and array-typed columns when presence=false). + - Unexpected skipped work: Skipped full-SQL equality assertions in favor of resilient fragment checks; did not add exhaustive array-presence permutations or parameterized arrays. + - Unexpected extra work: Hit 11c NOT NULL guard initially; adjusted test schema (`name.nullable = true`) to model a safe scenario and document the guard behavior. + - Learnings: The 11c safety guard prevents UNNEST superset when a NOT NULL column without a default is omitted by any row; tests must reflect safe insert/update semantics or provide defaults/nullable columns. + - Deliberate tech debt: No history CTE assertions for UNNEST paths; no coverage of partitioning mixed column sets (11d) or metrics observability (15). + +- [x] 12c. Unit tests: batch planning helpers + - Definition: + - `computeInsertBatchPlan`: superset column computation, PK inclusion, NULL fill for absent fields, deterministic order. + - `computeUpsertBatchPlan`: identical column-set enforcement across rows and PK presence; normalization of values. + - `computeUpsertSupersetPlanWithPresence`: superset columns, value normalization, and correct presence matrix per row/column. + - Relevant files: `db_changes/db/dialect_postgres.go`, `db_changes/db/dialect_postgres_test.go`. + - Confidence: high. + - Progress: + - Expected work: Added `Test_computeInsertBatchPlan_SupersetAndNulls`, `Test_computeUpsertBatchPlan_IdenticalAndHeterogeneous`, and `Test_computeUpsertSupersetPlanWithPresence_Basics` to validate superset computation, identical-set enforcement, normalization, and presence matrix. + - Unexpected skipped work: Skipped any SQL text assertions by design; avoided history/CTE checks (covered elsewhere). + - Unexpected extra work: Wrote comprehensive docstrings for all sections/tests to clarify intent and coverage boundaries. + - Learnings: Column order is deterministic (sorted); PKs are enforced; heterogeneous UPSERT sets correctly error; presence matrix reliably flags explicit fields. + - Deliberate tech debt: Missing negative tests for unknown columns/type normalization errors; no tests for missing PK in UPSERT inputs; partitioning logic (11d) not exercised. + +- [x] 12d. Unit tests: typing and SQL helper utilities + - Definition: + - `canonicalizePostgresType`: maps DatabaseTypeName (and arrays via leading `_`) to base types; unknowns lowercased. + - `arrayExprToTextLiteral`: transforms `ARRAY[...]`/brace literals to text array literal for array CASE arms. + - `buildInsertHistoryCTE` and `buildUpsertHistoryCTE`: shape and essential fields (op, pk JSON, prev_value) for reversible rows. + - Optional light checks on segmentation preconditions (strings present) without asserting full SQL text. + - Relevant files: `db_changes/db/dialect_postgres.go`, `db_changes/db/dialect_postgres_test.go`. + - Confidence: medium. + - Progress: + - Expected work: Added `Test_canonicalizePostgresType`, `Test_arrayExprToTextLiteral`, `Test_buildInsertHistoryCTE_Shape`, and `Test_buildUpsertHistoryCTE_Shape` to validate type mapping/array literal conversion and the essential shape/contents of history CTEs for reversible rows. + - Unexpected skipped work: Skipped segmentation precondition checks and exhaustive type matrix coverage; no parameterized arrays; no full-SQL equality assertions. + - Unexpected extra work: Wrote clarifying docstrings across tests and sections for readability. + - Learnings: Canonicalization normalizes driver names and detects arrays via leading `_`; array text conversion reliably handles ARRAY[...] and brace literals; history CTEs include only reversible rows and use a `src` CTE with LEFT JOIN to compute op/prev_value. + - Deliberate tech debt: Additional base types/enums/domains not enumerated; no integration proving transaction atomicity; observability/metrics tests deferred (Task 15). + +- [ ] 13. Integration tests against Postgres + - Definition: + - Ingest sample batches with/without mixed columns; verify data, history entries, and ordering; include fallback scenarios. + - Relevant files: test harness setup under `db_changes/db/` tests. Confidence: medium. + - Progress: + - Expected work: + - Unexpected skipped work: + - Unexpected extra work: + - Learnings: + - Deliberate tech debt: + +- [ ] 14. Benchmarks: single-row vs VALUES vs UNNEST + - Definition: + - Measure planning/execution times and throughput for various batch sizes; compare with TigerData findings on UNNEST advantages. + - Relevant files: benchmark scripts (new), optional Go benchmarks. Confidence: medium. + - Progress: + - Expected work: + - Unexpected skipped work: + - Unexpected extra work: + - Learnings: + - Deliberate tech debt: + +- [ ] 15. Observability: counters and logs + - Definition: + - Expose metrics: batches formed, rows per batch, fallback occurrences, mode used (values/unnest), errors; add debug tracing of generated SQL when tracer enabled. + - Relevant files: `db_changes/db/dialect_postgres.go`, logging facilities. Confidence: high. + - Progress: + - Expected work: + - Unexpected skipped work: + - Unexpected extra work: + - Learnings: + - Deliberate tech debt: + +- [ ] 16. Documentation + - Definition: + - Document flags, safety constraints, trade-offs, and when to choose VALUES vs UNNEST; reference TigerData article and ClickHouse prior art. + - Relevant files: README/docs (new or existing), CLI help in `common_flags.go`. Confidence: high. + - Progress: + - Expected work: + - Unexpected skipped work: + - Unexpected extra work: + - Learnings: + - Deliberate tech debt: + +- [ ] 17. Optional: Parameterize UNNEST arrays (avoid literal arrays) + - Definition: + - Switch UNNEST mode to use bound parameters/driver array types instead of literal `ARRAY[...]` to reduce SQL size and improve safety. Ensure proper per-column casts and ordering are preserved. + - Relevant files: `db_changes/db/dialect_postgres.go` (UNNEST builder), potential driver integration. Confidence: medium. + - Progress: + - Expected work: + - Unexpected skipped work: + - Unexpected extra work: + - Learnings: + - Deliberate tech debt: + + + diff --git a/db_changes/db/dialect_postgres_test.go b/db_changes/db/dialect_postgres_test.go index 95173ae..b35bb5c 100644 --- a/db_changes/db/dialect_postgres_test.go +++ b/db_changes/db/dialect_postgres_test.go @@ -9,6 +9,9 @@ import ( "github.com/stretchr/testify/require" ) +// TestPrimaryKeyToJSON verifies deterministic JSON encoding of primary key maps: +// - single and multi-key support +// - stable lexical ordering of keys in output JSON func TestPrimaryKeyToJSON(t *testing.T) { tests := []struct { @@ -51,6 +54,8 @@ func TestPrimaryKeyToJSON(t *testing.T) { } +// TestJSONToPrimaryKey verifies decoding a JSON primary key back to a string map +// and preserves all keys/values regardless of input ordering. func TestJSONToPrimaryKey(t *testing.T) { tests := []struct { @@ -94,6 +99,8 @@ func TestJSONToPrimaryKey(t *testing.T) { } +// TestGetPrimaryKeyFakeEmptyValues verifies formatting of fake empty values used +// in history queries and that output is stable and lexically sorted for multi-keys. func TestGetPrimaryKeyFakeEmptyValues(t *testing.T) { tests := []struct { name string @@ -135,12 +142,12 @@ func TestGetPrimaryKeyFakeEmptyValues(t *testing.T) { t.Run(tt.name, func(t *testing.T) { result := getPrimaryKeyFakeEmptyValues(tt.primaryKey) assert.Equal(t, tt.expected, result) - + // For multiple keys, verify the order is predictable (alphabetical) if len(tt.primaryKey) > 1 { parts := strings.Split(result, ",") for i := 1; i < len(parts); i++ { - assert.True(t, strings.Compare(parts[i-1], parts[i]) <= 0, + assert.True(t, strings.Compare(parts[i-1], parts[i]) <= 0, "Expected sorted keys, but got %s before %s", parts[i-1], parts[i]) } } @@ -148,6 +155,8 @@ func TestGetPrimaryKeyFakeEmptyValues(t *testing.T) { } } +// TestGetPrimaryKeyFakeEmptyValuesAssertion verifies the IS NULL assertion builder +// for one or multiple primary key columns, including schema-qualified table names. func TestGetPrimaryKeyFakeEmptyValuesAssertion(t *testing.T) { tests := []struct { name string @@ -193,12 +202,12 @@ func TestGetPrimaryKeyFakeEmptyValuesAssertion(t *testing.T) { t.Run(tt.name, func(t *testing.T) { result := getPrimaryKeyFakeEmptyValuesAssertion(tt.primaryKey, tt.escapedTableName) assert.Equal(t, tt.expected, result) - + // For multiple keys, verify the order is predictable (alphabetical) if len(tt.primaryKey) > 1 { parts := strings.Split(result, "AND ") for i := 1; i < len(parts); i++ { - assert.True(t, strings.Compare(parts[i-1], parts[i]) <= 0, + assert.True(t, strings.Compare(parts[i-1], parts[i]) <= 0, "Expected sorted parts, but got %s before %s", parts[i-1], parts[i]) } } @@ -206,6 +215,10 @@ func TestGetPrimaryKeyFakeEmptyValuesAssertion(t *testing.T) { } } +// TestRevertOp validates SQL emitted to revert history operations: +// - I (insert) => DELETE target row +// - D (delete) => INSERT row from stored JSON +// - U (update) => UPDATE FROM json_populate_record of previous state func TestRevertOp(t *testing.T) { type row struct { @@ -266,4 +279,359 @@ func TestRevertOp(t *testing.T) { }) } -} \ No newline at end of file +} + +// --- 12a: UNNEST INSERT builder tests --- +// These tests exercise buildUnnestInsertSQL for INSERT-only batches, ensuring: +// - scalar columns use typed ARRAY[...] with WITH ORDINALITY +// - array-typed columns are projected via CASE-by-ordinal with typed casts +// - NULL arrays and empty fallback ('{}'::type[]) are correctly emitted +// - an error is returned if there are no scalar columns to drive WITH ORDINALITY + +// mkTestTable is a small helper to construct a table with given columns and PKs. +func mkTestTable(t *testing.T, name string, pk []string, cols map[string]*ColumnInfo) *TableInfo { + t.Helper() + tbl, err := NewTableInfo("public", name, pk, cols) + require.NoError(t, err) + return tbl +} + +// Test_buildUnnestInsertSQL_MixedScalarAndArray verifies that the INSERT UNNEST builder: +// - emits typed arrays for scalars with WITH ORDINALITY aliases +// - selects array columns via CASE ((s.ord)::int) with per-row typed arrays +// - uses '{}'::varchar[] as the ELSE fallback and NULL::varchar[] when absent +func Test_buildUnnestInsertSQL_MixedScalarAndArray(t *testing.T) { + cols := map[string]*ColumnInfo{ + "id": NewColumnInfo("id", "INT8", int64(0)), // bigint + "amount": NewColumnInfo("amount", "NUMERIC", float64(0)), + "tags": NewColumnInfo("tags", "_TEXT", ""), // text[] + } + tbl := mkTestTable(t, "xfer", []string{"id"}, cols) + + // columnsEscaped must match ColumnInfo.escapedName values + columnsEscaped := []string{`"amount"`, `"id"`, `"tags"`} + perRowValues := [][]string{ + {"12.34", "1", "'{a,b}'"}, // tags present + {"56.78", "2", "NULL"}, // tags absent -> NULL + } + + sql, err := (&PostgresDialect{}).buildUnnestInsertSQL(tbl, columnsEscaped, perRowValues) + require.NoError(t, err) + + // Basic shape + assert.Contains(t, sql, `INSERT INTO "public"."xfer" ("amount","id","tags") SELECT`) + // Scalars become typed arrays and use WITH ORDINALITY + assert.Contains(t, sql, `unnest(ARRAY[12.34,56.78]::numeric[], ARRAY[1,2]::bigint[]) WITH ORDINALITY AS s(c0,c1,ord)`) + // Array column uses CASE-by-ordinal with typed casts and empty fallback + assert.Contains(t, sql, `CASE ((s.ord)::int)`) + assert.Contains(t, sql, `WHEN 1 THEN`) + assert.Contains(t, sql, `WHEN 2 THEN`) + assert.Contains(t, sql, `ELSE '{}'::varchar[] END`) + // NULL array in row 2 should render as NULL::varchar[] in a CASE arm + assert.Contains(t, sql, `NULL::varchar[]`) +} + +// Test_buildUnnestInsertSQL_NoScalarColumns_Error verifies the guarded path that +// returns an error when a batch contains only array-typed columns (no scalars). +func Test_buildUnnestInsertSQL_NoScalarColumns_Error(t *testing.T) { + cols := map[string]*ColumnInfo{ + // contrived: only array-typed column (also PK) + "keyarr": NewColumnInfo("keyarr", "_INT8", int64(0)), // bigint[] + } + tbl := mkTestTable(t, "arr_only", []string{"keyarr"}, cols) + + columnsEscaped := []string{`"keyarr"`} + perRowValues := [][]string{ + {"'{1,2}'"}, + {"'{3,4}'"}, + } + + _, err := (&PostgresDialect{}).buildUnnestInsertSQL(tbl, columnsEscaped, perRowValues) + require.Error(t, err) + assert.Contains(t, err.Error(), "no scalar columns") +} + +// --- 12b: UNNEST UPSERT with presence --- +// These tests exercise buildUnnestUpsertSQLWithPresence for UPSERT batches, ensuring: +// - per-column boolean[] presence arrays drive conditional updates in DO UPDATE +// - typed scalar value arrays and CASE-by-ordinal projections for array columns +// - default inlining occurs for presence=false in projection (both scalar and array) +// - tests avoid the 11c NOT NULL guard by making omitted columns nullable or defaulted + +// Test_buildUnnestUpsertSQLWithPresence_Basics validates SQL shape for UNNEST UPSERT: +// - boolean presence arrays (::boolean[]) +// - typed scalar arrays and WITH ORDINALITY +// - ON CONFLICT ... DO UPDATE uses presence via a subquery on src (c{idx}p) +func Test_buildUnnestUpsertSQLWithPresence_Basics(t *testing.T) { + // Columns: age (INT8), id (INT8, PK), name (TEXT) + age := NewColumnInfo("age", "INT8", int64(0)) + id := NewColumnInfo("id", "INT8", int64(0)) + name := NewColumnInfo("name", "TEXT", "") + name.nullable = true + cols := map[string]*ColumnInfo{ + "age": age, + "id": id, + "name": name, + } + tbl := mkTestTable(t, "users", []string{"id"}, cols) + + columnsEscaped := []string{`"age"`, `"id"`, `"name"`} + perRowValues := [][]string{ + {"30", "1", "NULL"}, // name absent + {"40", "2", "'bob'"}, // name present + } + perRowPresence := [][]bool{ + {true, true, false}, + {true, true, true}, + } + + sql, err := (&PostgresDialect{}).buildUnnestUpsertSQLWithPresence(tbl, columnsEscaped, perRowValues, perRowPresence) + require.NoError(t, err) + + // Basic shape: INSERT ... SELECT ... FROM unnest(...) WITH ORDINALITY AS s(...) + assert.Contains(t, sql, `INSERT INTO "public"."users" ("age","id","name") SELECT`) + assert.Contains(t, sql, `WITH ORDINALITY AS s(`) + // Presence arrays must be boolean[] alongside typed value arrays for scalars + assert.Contains(t, sql, `::boolean[]`) + assert.Contains(t, sql, `ARRAY[30,40]::bigint[]`) + assert.Contains(t, sql, `ARRAY[1,2]::bigint[]`) + // Upsert with presence-controlled updates + assert.Contains(t, sql, `ON CONFLICT ("id") DO UPDATE SET`) + // Update should reference presence via subquery on src with pk join; for name (index 2) expect c2p + assert.Contains(t, sql, `CASE WHEN (SELECT src.c2p FROM src WHERE`) +} + +// Test_buildUnnestUpsertSQLWithPresence_DefaultInlining validates that when presence=false +// the projection inlines defaults: scalar uses ELSE default, array uses ELSE default::type[]. +func Test_buildUnnestUpsertSQLWithPresence_DefaultInlining(t *testing.T) { + // Scalar default for score; array default for tags + score := NewColumnInfo("score", "INT8", int64(0)) + score.hasDefault = true + score.defaultExpr = "42" + // tags is array-typed TEXT[] with '{}' default + tags := NewColumnInfo("tags", "_TEXT", "") + tags.hasDefault = true + tags.defaultExpr = "'{}'" + id := NewColumnInfo("id", "INT8", int64(0)) + cols := map[string]*ColumnInfo{ + "id": id, + "score": score, + "tags": tags, + } + tbl := mkTestTable(t, "users", []string{"id"}, cols) + + columnsEscaped := []string{`"score"`, `"id"`, `"tags"`} + perRowValues := [][]string{ + {"NULL", "1", "NULL"}, // both defaults apply when absent + {"100", "2", "'{x,y}'"}, // both present + } + perRowPresence := [][]bool{ + {false, true, false}, + {true, true, true}, + } + + sql, err := (&PostgresDialect{}).buildUnnestUpsertSQLWithPresence(tbl, columnsEscaped, perRowValues, perRowPresence) + require.NoError(t, err) + + // Scalar default inlining for score: ELSE 42 in projection, typed to bigint + assert.Contains(t, sql, `CASE WHEN s.p0 THEN (s.v0)::bigint ELSE 42 END`) + // Array default inlining for tags: ELSE ('{}')::varchar[] + assert.Contains(t, sql, `CASE WHEN s.p2 THEN`) + assert.Contains(t, sql, `ELSE ('{}')::varchar[] END`) +} + +// --- 12c: batch planning helpers --- +// These tests validate pre-SQL planning helpers (no SQL text assertions): +// - computeInsertBatchPlan: superset columns, PK inclusion, NULL filling, order +// - computeUpsertBatchPlan: identical column-set enforcement vs heterogeneous error +// - computeUpsertSupersetPlanWithPresence: superset, normalized values, presence matrix + +// Test_computeInsertBatchPlan_SupersetAndNulls ensures superset columns across INSERT rows, +// deterministic sorted order, PK inclusion, and NULL for absent fields. +func Test_computeInsertBatchPlan_SupersetAndNulls(t *testing.T) { + cols := map[string]*ColumnInfo{ + "id": NewColumnInfo("id", "INT8", int64(0)), + "name": NewColumnInfo("name", "TEXT", ""), + "age": NewColumnInfo("age", "INT8", int64(0)), + } + tbl := mkTestTable(t, "users", []string{"id"}, cols) + + ops := []*Operation{ + {opType: OperationTypeInsert, table: tbl, data: map[string]string{"id": "1", "name": "alice"}}, + {opType: OperationTypeInsert, table: tbl, data: map[string]string{"id": "2", "age": "30"}}, + } + + colsEsc, rows, err := (&PostgresDialect{}).computeInsertBatchPlan(ops) + require.NoError(t, err) + + // Sorted by raw name: age, id, name + assert.Equal(t, []string{`"age"`, `"id"`, `"name"`}, colsEsc) + require.Len(t, rows, 2) + assert.Equal(t, []string{"NULL", "1", "'alice'"}, rows[0]) + assert.Equal(t, []string{"30", "2", "NULL"}, rows[1]) +} + +// Test_computeUpsertBatchPlan_IdenticalAndHeterogeneous verifies that UPSERT batches require +// identical column sets across rows and error out on heterogeneous sets. +func Test_computeUpsertBatchPlan_IdenticalAndHeterogeneous(t *testing.T) { + cols := map[string]*ColumnInfo{ + "id": NewColumnInfo("id", "INT8", int64(0)), + "name": NewColumnInfo("name", "TEXT", ""), + "age": NewColumnInfo("age", "INT8", int64(0)), + } + tbl := mkTestTable(t, "users", []string{"id"}, cols) + + t.Run("identical_column_set", func(t *testing.T) { + ops := []*Operation{ + {opType: OperationTypeUpsert, table: tbl, data: map[string]string{"id": "1", "name": "alice"}}, + {opType: OperationTypeUpsert, table: tbl, data: map[string]string{"id": "2", "name": "bob"}}, + } + colsEsc, rows, err := (&PostgresDialect{}).computeUpsertBatchPlan(ops) + require.NoError(t, err) + assert.Equal(t, []string{`"id"`, `"name"`}, colsEsc) + require.Len(t, rows, 2) + assert.Equal(t, []string{"1", "'alice'"}, rows[0]) + assert.Equal(t, []string{"2", "'bob'"}, rows[1]) + }) + + t.Run("heterogeneous_sets_error", func(t *testing.T) { + ops := []*Operation{ + {opType: OperationTypeUpsert, table: tbl, data: map[string]string{"id": "1", "name": "alice"}}, + {opType: OperationTypeUpsert, table: tbl, data: map[string]string{"id": "2", "age": "30"}}, + } + _, _, err := (&PostgresDialect{}).computeUpsertBatchPlan(ops) + require.Error(t, err) + assert.Contains(t, err.Error(), "heterogeneous") + }) +} + +// Test_computeUpsertSupersetPlanWithPresence_Basics validates superset columns are computed, +// values normalized, and presence matrix flags which columns were explicitly provided. +func Test_computeUpsertSupersetPlanWithPresence_Basics(t *testing.T) { + cols := map[string]*ColumnInfo{ + "id": NewColumnInfo("id", "INT8", int64(0)), + "name": NewColumnInfo("name", "TEXT", ""), + "age": NewColumnInfo("age", "INT8", int64(0)), + } + tbl := mkTestTable(t, "users", []string{"id"}, cols) + + ops := []*Operation{ + {opType: OperationTypeUpsert, table: tbl, data: map[string]string{"id": "1", "name": "alice"}}, + {opType: OperationTypeUpsert, table: tbl, data: map[string]string{"id": "2", "age": "30"}}, + } + + colsEsc, vals, pres, err := (&PostgresDialect{}).computeUpsertSupersetPlanWithPresence(ops) + require.NoError(t, err) + assert.Equal(t, []string{`"age"`, `"id"`, `"name"`}, colsEsc) + require.Len(t, vals, 2) + require.Len(t, pres, 2) + assert.Equal(t, []string{"NULL", "1", "'alice'"}, vals[0]) + assert.Equal(t, []bool{false, true, true}, pres[0]) + assert.Equal(t, []string{"30", "2", "NULL"}, vals[1]) + assert.Equal(t, []bool{true, true, false}, pres[1]) +} + +// --- 12d: typing and SQL helper utilities --- +// These tests cover utility functions and CTE builders used by batching: +// - canonicalizePostgresType: DatabaseTypeName -> (baseType, isArray) +// - arrayExprToTextLiteral: ARRAY[...] and brace literals -> text array literal +// - buildInsertHistoryCTE/buildUpsertHistoryCTE: shape/essential fields for reversible rows + +// Test_canonicalizePostgresType validates base type mapping and array detection. +func Test_canonicalizePostgresType(t *testing.T) { + bt, arr := canonicalizePostgresType("INT8") + assert.Equal(t, "bigint", bt) + assert.False(t, arr) + + bt, arr = canonicalizePostgresType("_INT8") + assert.Equal(t, "bigint", bt) + assert.True(t, arr) + + bt, arr = canonicalizePostgresType("TEXT") + assert.Equal(t, "varchar", bt) + assert.False(t, arr) + + bt, arr = canonicalizePostgresType("TIMESTAMPTZ") + assert.Equal(t, "timestamptz", bt) + assert.False(t, arr) + + bt, arr = canonicalizePostgresType("FOO_BAR") + assert.Equal(t, "foo_bar", bt) + assert.False(t, arr) +} + +// Test_arrayExprToTextLiteral validates transformation to text array literal for +// ARRAY[...] expressions, quoted brace literals, and fallback wrapping. +func Test_arrayExprToTextLiteral(t *testing.T) { + out := arrayExprToTextLiteral("ARRAY[1,2]::bigint[]") + assert.Equal(t, "{1,2}", out) + + out = arrayExprToTextLiteral("'{a,b}'::text[]") + assert.Equal(t, "{a,b}", out) + + out = arrayExprToTextLiteral("{1,2,3}") + assert.Equal(t, "{1,2,3}", out) + + out = arrayExprToTextLiteral("42") + assert.Equal(t, "{42}", out) +} + +// Test_buildInsertHistoryCTE_Shape validates that the INSERT history CTE includes +// only reversible rows and contains op/table_name/pk/block_num fields. +func Test_buildInsertHistoryCTE_Shape(t *testing.T) { + d := PostgresDialect{historyTableName: "history"} + cols := map[string]*ColumnInfo{ + "id": NewColumnInfo("id", "INT8", int64(0)), + } + tbl := mkTestTable(t, "events", []string{"id"}, cols) + + // One reversible, one irreversible (nil) + rb := uint64(100) + ops := []*Operation{ + {opType: OperationTypeInsert, table: tbl, primaryKey: map[string]string{"id": "1"}, reversibleBlockNum: &rb}, + {opType: OperationTypeInsert, table: tbl, primaryKey: map[string]string{"id": "2"}, reversibleBlockNum: nil}, + } + + cte, needs := d.buildInsertHistoryCTE("public", ops) + require.True(t, needs) + // Must target the quoted history table + assert.Contains(t, cte, `"public"."history"`) + // Must contain op 'I' and the table identifier as a quoted string + assert.Contains(t, cte, "'I'") + assert.Contains(t, cte, `'"public"."events"'`) + // Must contain pk json and block number 100 + assert.Contains(t, cte, `'{"id":"1"}'`) + assert.Contains(t, cte, "100") + // Should not include id=2 (no reversible block) + assert.NotContains(t, cte, `'{"id":"2"}'`) +} + +// Test_buildUpsertHistoryCTE_Shape validates that the UPSERT history CTE builds a +// src VALUES list for reversible rows, LEFT JOINs target, and computes op/prev_value. +func Test_buildUpsertHistoryCTE_Shape(t *testing.T) { + d := PostgresDialect{historyTableName: "history"} + cols := map[string]*ColumnInfo{ + "id": NewColumnInfo("id", "INT8", int64(0)), + "v": NewColumnInfo("v", "TEXT", ""), + } + tbl := mkTestTable(t, "kv", []string{"id"}, cols) + + rb := uint64(777) + ops := []*Operation{ + {opType: OperationTypeUpsert, table: tbl, primaryKey: map[string]string{"id": "10"}, data: map[string]string{"id": "10", "v": "x"}, reversibleBlockNum: &rb}, + {opType: OperationTypeUpsert, table: tbl, primaryKey: map[string]string{"id": "11"}, data: map[string]string{"id": "11", "v": "y"}}, + } + + cte, needs := d.buildUpsertHistoryCTE("public", tbl, ops) + require.True(t, needs) + // Targets history table and uses nested WITH src(...) + assert.Contains(t, cte, `"public"."history"`) + assert.Contains(t, cte, "WITH src(") + assert.Contains(t, cte, "VALUES (") + // Contains pk json and block number, and LEFT JOIN target table + assert.Contains(t, cte, `'{"id":"10"}'`) + assert.Contains(t, cte, "777") + assert.Contains(t, cte, `LEFT JOIN "public"."kv" AS target ON`) + // Computes op via CASE WHEN target.pk IS NULL THEN 'I' ELSE 'U' + assert.Contains(t, cte, `CASE WHEN target."id" IS NULL THEN 'I' ELSE 'U' END AS op`) +} diff --git a/db_changes/db/types.go b/db_changes/db/types.go index 1caf0e9..ed2a35c 100644 --- a/db_changes/db/types.go +++ b/db_changes/db/types.go @@ -62,6 +62,9 @@ type ColumnInfo struct { escapedName string databaseTypeName string scanType reflect.Type + nullable bool + hasDefault bool + defaultExpr string } func NewColumnInfo(name string, databaseTypeName string, scanType any) *ColumnInfo { diff --git a/db_changes/sinker/factory.go b/db_changes/sinker/factory.go index d15414f..c9264b1 100644 --- a/db_changes/sinker/factory.go +++ b/db_changes/sinker/factory.go @@ -12,6 +12,8 @@ import ( "go.uber.org/zap" ) +var _ = time.Second // keep import + type SinkerFactoryFunc func(ctx context.Context, dsnString string, logger *zap.Logger, tracer logging.Tracer) (*SQLSinker, error) type SinkerFactoryOptions struct { @@ -25,6 +27,10 @@ type SinkerFactoryOptions struct { HandleReorgs bool FlushRetryCount int FlushRetryDelay time.Duration + // Postgres insert batching (runtime flags wired in from run.go) + PgInsertBatchMode string // off|values|unnest + PgInsertBatchSize int + PgInsertOnly bool } func SinkerFactory( @@ -54,6 +60,14 @@ func SinkerFactory( return nil, fmt.Errorf("creating loader: %w", err) } + // Configure Postgres insert batching on the loader (no-op for other dialects) + dbLoader.ConfigurePgInsertBatching(options.PgInsertBatchMode, options.PgInsertBatchSize, options.PgInsertOnly) + logger.Info("pg insert batching settings", + zap.String("mode", options.PgInsertBatchMode), + zap.Int("batch_size", options.PgInsertBatchSize), + zap.Bool("insert_only", options.PgInsertOnly), + ) + if err := dbLoader.LoadTables(dsn.Schema(), options.CursorTableName, options.HistoryTableName); err != nil { var e *db.SystemTableError if errors.As(err, &e) {