Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions cmd/substreams-sink-sql/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ var sinkRunCmd = Command(sinkRunE,
flags.Int("flush-retry-count", 3, "Number of retry attempts for flush operations")
flags.Duration("flush-retry-delay", 1*time.Second, "Base delay for incremental retry backoff on flush failures")
flags.StringP("endpoint", "e", "", "Specify the substreams endpoint, ex: `mainnet.eth.streamingfast.io:443`")

// Postgres insert-only batching (runtime-only flags)
flags.String("pg-insert-batch-mode", "off", "Postgres insert batching mode: off|values|unnest (runtime-only)")
flags.Int("pg-insert-batch-size", 1000, "Postgres insert batch size when batching is enabled (runtime-only)")
flags.Bool("pg-insert-only", false, "Assert insert-only processing; if other ops are present, fallback or error based on future wiring (runtime-only)")
}),
Example("substreams-sink-sql run 'postgres://localhost:5432/posgres?sslmode=disable' [email protected]"),
OnCommandErrorLogAndExit(zlog),
Expand Down Expand Up @@ -98,6 +103,11 @@ func sinkRunE(cmd *cobra.Command, args []string) error {
flushRetryCount := sflags.MustGetInt(cmd, "flush-retry-count")
flushRetryDelay := sflags.MustGetDuration(cmd, "flush-retry-delay")

// Read Postgres insert batching flags
pgInsertBatchMode := sflags.MustGetString(cmd, "pg-insert-batch-mode")
pgInsertBatchSize := sflags.MustGetInt(cmd, "pg-insert-batch-size")
pgInsertOnly := sflags.MustGetBool(cmd, "pg-insert-only")

cursorTableName := sflags.MustGetString(cmd, "cursors-table")
historyTableName := sflags.MustGetString(cmd, "history-table")
handleReorgs := sflags.MustGetInt(cmd, "undo-buffer-size") == 0
Expand All @@ -113,6 +123,9 @@ func sinkRunE(cmd *cobra.Command, args []string) error {
HandleReorgs: handleReorgs,
FlushRetryCount: flushRetryCount,
FlushRetryDelay: flushRetryDelay,
PgInsertBatchMode: pgInsertBatchMode,
PgInsertBatchSize: pgInsertBatchSize,
PgInsertOnly: pgInsertOnly,
})

postgresSinker, err := sinkerFactory(app.Context(), dsnString, zlog, tracer)
Expand Down
158 changes: 156 additions & 2 deletions db_changes/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ type Loader struct {
testTx *TestTx // used for testing: if non-nil, 'loader.BeginTx()' will return this object instead of a real *sql.Tx
dsn *DSN

// Postgres insert batching (runtime-configurable; defaults imply disabled)
pgInsertBatchMode string // off|values|unnest
pgInsertBatchSize int // >0 when enabled
pgInsertOnly bool // assert-only mode; if true, non-insert ops should be rejected or cause fallback
batchOrdinal uint64 // Counter for ordinals within the current batch, resets on flush
}

Expand Down Expand Up @@ -161,6 +165,32 @@ func (l *Loader) FlushNeeded() bool {
return totalRows > l.batchRowFlushInterval
}

// PgInsertBatchMode returns the configured Postgres batch mode ("off" if unset).
func (l *Loader) PgInsertBatchMode() string {
if l.pgInsertBatchMode == "" {
return "off"
}
return l.pgInsertBatchMode
}

// PgInsertBatchSize returns the configured batch size (0 means unset/disabled).
func (l *Loader) PgInsertBatchSize() int {
return l.pgInsertBatchSize
}

// PgInsertOnly indicates whether insert-only assertions are enabled.
func (l *Loader) PgInsertOnly() bool {
return l.pgInsertOnly
}

// ConfigurePgInsertBatching sets Postgres insert batching configuration on the loader.
// mode should be one of: "off", "values", "unnest". size <= 0 disables batching.
func (l *Loader) ConfigurePgInsertBatching(mode string, size int, insertOnly bool) {
l.pgInsertBatchMode = mode
l.pgInsertBatchSize = size
l.pgInsertOnly = insertOnly
}

// getTablesFromSchema returns table information similar to schema.Tables()
// but only inspects tables in the specified schema to avoid issues with database extensions
func (l *Loader) getTablesFromSchema(schemaName string) (map[[2]string][]*sql.ColumnType, error) {
Expand Down Expand Up @@ -209,6 +239,116 @@ func (l *Loader) getTablesFromSchema(schemaName string) (map[[2]string][]*sql.Co
return result, nil
}

// getTableColumns returns column information for a specific table
func (l *Loader) getTableColumns(schemaName, tableName string) ([]*sql.ColumnType, error) {
// Use a simple query to get column information
query := fmt.Sprintf("SELECT * FROM %s.%s WHERE 1=0",
EscapeIdentifier(schemaName),
EscapeIdentifier(tableName))

rows, err := l.DB.Query(query)
if err != nil {
return nil, fmt.Errorf("querying table structure: %w", err)
}
defer rows.Close()

return rows.ColumnTypes()
}

// fetchColumnPgTypes returns a mapping of column name to PostgreSQL type name (pg_type.typname),
// which properly reflects user-defined types (e.g., enums) and array types (leading underscore).
// Example: bigint[] -> _int8, enum call_type -> call_type, enum[] -> _call_type.
func (l *Loader) fetchColumnPgTypes(schemaName, tableName string) (map[string]string, error) {
query := `
SELECT a.attname, t.typname
FROM pg_attribute a
JOIN pg_class c ON c.oid = a.attrelid
JOIN pg_namespace n ON n.oid = c.relnamespace
JOIN pg_type t ON t.oid = a.atttypid
WHERE n.nspname = $1
AND c.relname = $2
AND a.attnum > 0
AND NOT a.attisdropped
ORDER BY a.attnum`

rows, err := l.DB.Query(query, schemaName, tableName)
if err != nil {
return nil, fmt.Errorf("querying pg types for %s.%s: %w", schemaName, tableName, err)
}
defer rows.Close()

out := make(map[string]string)
for rows.Next() {
var col string
var typ string
if err := rows.Scan(&col, &typ); err != nil {
return nil, fmt.Errorf("scanning pg types row: %w", err)
}
out[col] = typ
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterating pg types rows: %w", err)
}
return out, nil
}

// fetchColumnNullabilityDefaults returns per-column nullability and default expression info.
// defaultExpr is returned as a raw SQL expression via pg_get_expr(adbin, adrelid).
func (l *Loader) fetchColumnNullabilityDefaults(schemaName, tableName string) (map[string]struct {
Nullable bool
HasDefault bool
DefaultExpr string
}, error) {
query := `
SELECT a.attname,
NOT a.attnotnull AS nullable,
(ad.adbin IS NOT NULL) AS has_default,
pg_get_expr(ad.adbin, ad.adrelid) AS default_expr
FROM pg_attribute a
JOIN pg_class c ON c.oid = a.attrelid
JOIN pg_namespace n ON n.oid = c.relnamespace
LEFT JOIN pg_attrdef ad ON ad.adrelid = a.attrelid AND ad.adnum = a.attnum
WHERE n.nspname = $1
AND c.relname = $2
AND a.attnum > 0
AND NOT a.attisdropped
ORDER BY a.attnum`

rows, err := l.DB.Query(query, schemaName, tableName)
if err != nil {
return nil, fmt.Errorf("querying nullability/defaults for %s.%s: %w", schemaName, tableName, err)
}
defer rows.Close()

out := make(map[string]struct {
Nullable bool
HasDefault bool
DefaultExpr string
})
for rows.Next() {
var col string
var nullable bool
var hasDefault bool
var defaultExpr sql.NullString
if err := rows.Scan(&col, &nullable, &hasDefault, &defaultExpr); err != nil {
return nil, fmt.Errorf("scanning nullability/default row: %w", err)
}
expr := ""
if defaultExpr.Valid {
expr = defaultExpr.String
}
out[col] = struct {
Nullable bool
HasDefault bool
DefaultExpr string
}{Nullable: nullable, HasDefault: hasDefault, DefaultExpr: expr}
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterating nullability/default rows: %w", err)
}
return out, nil
}

func (l *Loader) LoadTables(schemaName string, cursorTableName string, historyTableName string) error {
schemaTables, err := l.getTablesFromSchema(schemaName)
if err != nil {
Expand Down Expand Up @@ -240,13 +380,27 @@ func (l *Loader) LoadTables(schemaName string, cursorTableName string, historyTa
}

columnByName := make(map[string]*ColumnInfo, len(columns))
// Try to enrich with real PostgreSQL type names (handles enums/domains/arrays)
pgTypes, _ := l.fetchColumnPgTypes(schemaName, tableName)
// Also fetch nullability/defaults metadata for 11c default inlining
ndMeta, _ := l.fetchColumnNullabilityDefaults(schemaName, tableName)
for _, f := range columns {
columnByName[f.Name()] = &ColumnInfo{
dbType := f.DatabaseTypeName()
if real, ok := pgTypes[f.Name()]; ok && real != "" {
dbType = real
}
info := &ColumnInfo{
name: f.Name(),
escapedName: EscapeIdentifier(f.Name()),
databaseTypeName: f.DatabaseTypeName(),
databaseTypeName: dbType,
scanType: f.ScanType(),
}
if meta, ok := ndMeta[f.Name()]; ok {
info.nullable = meta.Nullable
info.hasDefault = meta.HasDefault
info.defaultExpr = meta.DefaultExpr
}
columnByName[f.Name()] = info
}

key, err := l.dialect.GetPrimaryKey(l.DB, schemaName, tableName)
Expand Down
Loading
Loading