diff --git a/cmd/config/config_env.go b/cmd/config/config_env.go index 056d1edd..ee540103 100644 --- a/cmd/config/config_env.go +++ b/cmd/config/config_env.go @@ -49,6 +49,8 @@ func init() { viper.BindEnv("PGSTREAM_POSTGRES_SNAPSHOT_WORKERS") viper.BindEnv("PGSTREAM_POSTGRES_SNAPSHOT_STORE_URL") viper.BindEnv("PGSTREAM_POSTGRES_SNAPSHOT_STORE_REPEATABLE") + viper.BindEnv("PGSTREAM_POSTGRES_SNAPSHOT_IGNORE_ROW_PROCESSING_ERRORS") + viper.BindEnv("PGSTREAM_POSTGRES_SNAPSHOT_LOG_ROW_ON_ERROR") viper.BindEnv("PGSTREAM_POSTGRES_SNAPSHOT_USE_SCHEMALOG") viper.BindEnv("PGSTREAM_POSTGRES_SNAPSHOT_INCLUDE_GLOBAL_DB_OBJECTS") viper.BindEnv("PGSTREAM_POSTGRES_SNAPSHOT_ROLE") @@ -219,11 +221,13 @@ func parseSnapshotConfig(pgURL string) (*snapshotbuilder.SnapshotListenerConfig, cfg := &snapshotbuilder.SnapshotListenerConfig{ Generator: pgsnapshotgenerator.Config{ - URL: pgURL, - BatchBytes: viper.GetUint64("PGSTREAM_POSTGRES_SNAPSHOT_BATCH_BYTES"), - SchemaWorkers: viper.GetUint("PGSTREAM_POSTGRES_SNAPSHOT_SCHEMA_WORKERS"), - TableWorkers: viper.GetUint("PGSTREAM_POSTGRES_SNAPSHOT_TABLE_WORKERS"), - SnapshotWorkers: viper.GetUint("PGSTREAM_POSTGRES_SNAPSHOT_WORKERS"), + URL: pgURL, + BatchBytes: viper.GetUint64("PGSTREAM_POSTGRES_SNAPSHOT_BATCH_BYTES"), + SchemaWorkers: viper.GetUint("PGSTREAM_POSTGRES_SNAPSHOT_SCHEMA_WORKERS"), + TableWorkers: viper.GetUint("PGSTREAM_POSTGRES_SNAPSHOT_TABLE_WORKERS"), + SnapshotWorkers: viper.GetUint("PGSTREAM_POSTGRES_SNAPSHOT_WORKERS"), + IgnoreRowProcessingErrors: viper.GetBool("PGSTREAM_POSTGRES_SNAPSHOT_IGNORE_ROW_PROCESSING_ERRORS"), + LogRowOnError: viper.GetBool("PGSTREAM_POSTGRES_SNAPSHOT_LOG_ROW_ON_ERROR"), }, Adapter: adapter.SnapshotConfig{ Tables: viper.GetStringSlice("PGSTREAM_POSTGRES_SNAPSHOT_TABLES"), diff --git a/cmd/config/config_yaml.go b/cmd/config/config_yaml.go index cc745401..048135dd 100644 --- a/cmd/config/config_yaml.go +++ b/cmd/config/config_yaml.go @@ -87,9 +87,11 @@ type SnapshotRecorderConfig struct { } type SnapshotDataConfig struct { - SchemaWorkers int `mapstructure:"schema_workers" yaml:"schema_workers"` - TableWorkers int `mapstructure:"table_workers" yaml:"table_workers"` - BatchBytes uint64 `mapstructure:"batch_bytes" yaml:"batch_bytes"` + SchemaWorkers int `mapstructure:"schema_workers" yaml:"schema_workers"` + TableWorkers int `mapstructure:"table_workers" yaml:"table_workers"` + BatchBytes uint64 `mapstructure:"batch_bytes" yaml:"batch_bytes"` + IgnoreRowProcessingErrors bool `mapstructure:"ignore_row_processing_errors" yaml:"ignore_row_processing_errors"` + LogRowOnError bool `mapstructure:"log_row_on_error" yaml:"log_row_on_error"` } type SnapshotSchemaConfig struct { @@ -493,6 +495,8 @@ func (c *YAMLConfig) parseDataSnapshotConfig() pgsnapshotgenerator.Config { cfg.BatchBytes = snapshotCfg.Data.BatchBytes cfg.SchemaWorkers = uint(snapshotCfg.Data.SchemaWorkers) cfg.TableWorkers = uint(snapshotCfg.Data.TableWorkers) + cfg.IgnoreRowProcessingErrors = snapshotCfg.Data.IgnoreRowProcessingErrors + cfg.LogRowOnError = snapshotCfg.Data.LogRowOnError } return cfg diff --git a/config_template.yaml b/config_template.yaml index 63b32985..b3df423a 100644 --- a/config_template.yaml +++ b/config_template.yaml @@ -22,6 +22,8 @@ source: schema_workers: 4 # number of schema tables to be snapshotted in parallel. Defaults to 4 table_workers: 4 # number of workers to snapshot a table in parallel. Defaults to 4 batch_bytes: 83886080 # bytes to read per batch (defaults to 80MiB) + ignore_row_processing_errors: false # if true, log errors during row processing and continue. Defaults to false. Warning: can result in consistency errors. + log_row_on_error: false # if true, log the row data as JSON when an error occurs. Defaults to false schema: # when mode is full or schema mode: pgdump_pgrestore # options are pgdump_pgrestore or schemalog pgdump_pgrestore: diff --git a/docs/configuration.md b/docs/configuration.md index 8e6b32b2..09acf327 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -31,6 +31,8 @@ source: schema_workers: 4 # number of schema tables to be snapshotted in parallel. Defaults to 4 table_workers: 4 # number of workers to snapshot a table in parallel. Defaults to 4 batch_bytes: 83886080 # bytes to read per batch (defaults to 80MiB) + ignore_row_processing_errors: false # if true, log errors during row processing and continue. Defaults to false. Warning: can result in consistency errors. + log_row_on_error: false # if true, log the row data as JSON when an error occurs. Defaults to false. Warning: can result in row data exposure. schema: # when mode is full or schema mode: pgdump_pgrestore # options are pgdump_pgrestore or schemalog pgdump_pgrestore: @@ -160,28 +162,30 @@ Here's a list of all the environment variables that can be used to configure the
Postgres Listener -| Environment Variable | Default | Required | Description | -| ---------------------------------------------------- | ---------------------------- | -------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | -| PGSTREAM_POSTGRES_LISTENER_URL | N/A | Yes | URL of the Postgres database to connect to for replication purposes. | -| PGSTREAM_POSTGRES_REPLICATION_SLOT_NAME | "pgstream_dbname_slot" | No | Name of the Postgres replication slot name. | -| PGSTREAM_POSTGRES_SNAPSHOT_TABLES | "" | No | Tables for which there will be an initial snapshot generated. The syntax supports wildcards. Tables without a schema defined will be applied the public schema. Example: for `public.test_table` and all tables in the `test_schema` schema, the value would be the following: `"test_table test_schema.\*"` | -| PGSTREAM_POSTGRES_SNAPSHOT_EXCLUDED_TABLES | "" | No | Tables that will be excluded in the snapshot process. The syntax does not support wildcards. Tables without a schema defined will be applied the public schema. | -| PGSTREAM_POSTGRES_SNAPSHOT_SCHEMA_WORKERS | 4 | No | Number of tables per schema that will be processed in parallel by the snapshotting process. | -| PGSTREAM_POSTGRES_SNAPSHOT_TABLE_WORKERS | 4 | No | Number of concurrent workers that will be used per table by the snapshotting process. | -| PGSTREAM_POSTGRES_SNAPSHOT_BATCH_BYTES | 83886080 (80MiB) | No | Max batch size in bytes to be read and processed by each table worker at a time. The number of pages in the select queries will be based on this value. | -| PGSTREAM_POSTGRES_SNAPSHOT_WORKERS | 1 | No | Number of schemas that will be processed in parallel by the snapshotting process. | -| PGSTREAM_POSTGRES_SNAPSHOT_USE_SCHEMALOG | False | No | Forces the use of the `pgstream.schema_log` for the schema snapshot instead of using `pg_dump`/`pg_restore` for Postgres targets. | -| PGSTREAM_POSTGRES_SNAPSHOT_CLEAN_TARGET_DB | False | No | When using `pg_dump`/`pg_restore` to snapshot schema for Postgres targets, option to issue commands to DROP all the objects that will be restored. | -| PGSTREAM_POSTGRES_SNAPSHOT_INCLUDE_GLOBAL_DB_OBJECTS | False | No | When using `pg_dump`/`pg_restore` to snapshot schema for Postgres targets, option to snapshot all global database objects outside of the selected schema (such as extensions, triggers, etc). | -| PGSTREAM_POSTGRES_SNAPSHOT_CREATE_TARGET_DB | False | No | When using `pg_dump`/`pg_restore` to snapshot schema for Postgres targets, option to create the database being restored. | -| PGSTREAM_POSTGRES_SNAPSHOT_NO_OWNER | False | No | When using `pg_dump`/`pg_restore` to snapshot schema for Postgres targets, do not output commands to set ownership of objects to match the original database. | -| PGSTREAM_POSTGRES_SNAPSHOT_NO_PRIVILEGES | False | No | When using `pg_dump`/`pg_restore` to snapshot schema for Postgres targets, do not output privilege related commands (grant/revoke). | -| PGSTREAM_POSTGRES_SNAPSHOT_EXCLUDED_SECURITY_LABELS | [] | No | When using `pg_dump`/`pg_restore` to snapshot schema for Postgres targets, list of providers whose security labels will be excluded. | -| PGSTREAM_POSTGRES_SNAPSHOT_ROLE | "" | No | When using `pg_dump`/`pg_restore` to snapshot schema for Postgres targets, role name to be used to create the dump. | -| PGSTREAM_POSTGRES_SNAPSHOT_ROLES_SNAPSHOT_MODE | "no_passwords" | No | When using `pg_dump`/`pg_restore` to snapshot schema for Postgres targets, controls how roles are snapshotted. Possible values: "enabled" (snapshot all roles including passwords), "disabled" (do not snapshot roles), "no_passwords" (snapshot roles but exclude passwords). | -| PGSTREAM_POSTGRES_SNAPSHOT_SCHEMA_DUMP_FILE | "" | No | When using `pg_dump`/`pg_restore` to snapshot schema for Postgres targets, file where the contents of the schema pg_dump command and output will be written for debugging purposes. | -| PGSTREAM_POSTGRES_SNAPSHOT_STORE_URL | "" | No | Postgres URL for the database where the snapshot requests and their status will be tracked. A table `snapshot_requests` will be created under a `pgstream` schema. | -| PGSTREAM_POSTGRES_SNAPSHOT_STORE_REPEATABLE | False (run), True (snapshot) | No | Allow to repeat snapshots requests that have been already completed succesfully. If using the run command, initial snapshots won't be repeatable by default. If the snapshot command is used instead, the snapshot will be repeatable by default. | +| Environment Variable | Default | Required | Description | +| ------------------------------------------------------- | ---------------------------- | -------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | +| PGSTREAM_POSTGRES_LISTENER_URL | N/A | Yes | URL of the Postgres database to connect to for replication purposes. | +| PGSTREAM_POSTGRES_REPLICATION_SLOT_NAME | "pgstream_dbname_slot" | No | Name of the Postgres replication slot name. | +| PGSTREAM_POSTGRES_SNAPSHOT_TABLES | "" | No | Tables for which there will be an initial snapshot generated. The syntax supports wildcards. Tables without a schema defined will be applied the public schema. Example: for `public.test_table` and all tables in the `test_schema` schema, the value would be the following: `"test_table test_schema.\*"` | +| PGSTREAM_POSTGRES_SNAPSHOT_EXCLUDED_TABLES | "" | No | Tables that will be excluded in the snapshot process. The syntax does not support wildcards. Tables without a schema defined will be applied the public schema. | +| PGSTREAM_POSTGRES_SNAPSHOT_SCHEMA_WORKERS | 4 | No | Number of tables per schema that will be processed in parallel by the snapshotting process. | +| PGSTREAM_POSTGRES_SNAPSHOT_TABLE_WORKERS | 4 | No | Number of concurrent workers that will be used per table by the snapshotting process. | +| PGSTREAM_POSTGRES_SNAPSHOT_BATCH_BYTES | 83886080 (80MiB) | No | Max batch size in bytes to be read and processed by each table worker at a time. The number of pages in the select queries will be based on this value. | +| PGSTREAM_POSTGRES_SNAPSHOT_WORKERS | 1 | No | Number of schemas that will be processed in parallel by the snapshotting process. | +| PGSTREAM_POSTGRES_SNAPSHOT_USE_SCHEMALOG | False | No | Forces the use of the `pgstream.schema_log` for the schema snapshot instead of using `pg_dump`/`pg_restore` for Postgres targets. | +| PGSTREAM_POSTGRES_SNAPSHOT_CLEAN_TARGET_DB | False | No | When using `pg_dump`/`pg_restore` to snapshot schema for Postgres targets, option to issue commands to DROP all the objects that will be restored. | +| PGSTREAM_POSTGRES_SNAPSHOT_INCLUDE_GLOBAL_DB_OBJECTS | False | No | When using `pg_dump`/`pg_restore` to snapshot schema for Postgres targets, option to snapshot all global database objects outside of the selected schema (such as extensions, triggers, etc). | +| PGSTREAM_POSTGRES_SNAPSHOT_CREATE_TARGET_DB | False | No | When using `pg_dump`/`pg_restore` to snapshot schema for Postgres targets, option to create the database being restored. | +| PGSTREAM_POSTGRES_SNAPSHOT_NO_OWNER | False | No | When using `pg_dump`/`pg_restore` to snapshot schema for Postgres targets, do not output commands to set ownership of objects to match the original database. | +| PGSTREAM_POSTGRES_SNAPSHOT_NO_PRIVILEGES | False | No | When using `pg_dump`/`pg_restore` to snapshot schema for Postgres targets, do not output privilege related commands (grant/revoke). | +| PGSTREAM_POSTGRES_SNAPSHOT_EXCLUDED_SECURITY_LABELS | [] | No | When using `pg_dump`/`pg_restore` to snapshot schema for Postgres targets, list of providers whose security labels will be excluded. | +| PGSTREAM_POSTGRES_SNAPSHOT_ROLE | "" | No | When using `pg_dump`/`pg_restore` to snapshot schema for Postgres targets, role name to be used to create the dump. | +| PGSTREAM_POSTGRES_SNAPSHOT_ROLES_SNAPSHOT_MODE | "no_passwords" | No | When using `pg_dump`/`pg_restore` to snapshot schema for Postgres targets, controls how roles are snapshotted. Possible values: "enabled" (snapshot all roles including passwords), "disabled" (do not snapshot roles), "no_passwords" (snapshot roles but exclude passwords). | +| PGSTREAM_POSTGRES_SNAPSHOT_SCHEMA_DUMP_FILE | "" | No | When using `pg_dump`/`pg_restore` to snapshot schema for Postgres targets, file where the contents of the schema pg_dump command and output will be written for debugging purposes. | +| PGSTREAM_POSTGRES_SNAPSHOT_STORE_URL | "" | No | Postgres URL for the database where the snapshot requests and their status will be tracked. A table `snapshot_requests` will be created under a `pgstream` schema. | +| PGSTREAM_POSTGRES_SNAPSHOT_STORE_REPEATABLE | False (run), True (snapshot) | No | Allow to repeat snapshots requests that have been already completed succesfully. If using the run command, initial snapshots won't be repeatable by default. If the snapshot command is used instead, the snapshot will be repeatable by default. | +| PGSTREAM_POSTGRES_SNAPSHOT_IGNORE_ROW_PROCESSING_ERRORS | False | No | Whether to ignore errors encountered while processing the snapshot rows. Can lead to inconsistent data on the target. | +| PGSTREAM_POSTGRES_SNAPSHOT_LOG_ROW_ON_ERROR | False | No | Whether to log the row details when the snapshot processing fails. Intended for debugging purposes. Can lead to exposure of sensitive data. |
diff --git a/pkg/snapshot/generator/postgres/data/config.go b/pkg/snapshot/generator/postgres/data/config.go index faffa6dd..05add8d9 100644 --- a/pkg/snapshot/generator/postgres/data/config.go +++ b/pkg/snapshot/generator/postgres/data/config.go @@ -18,6 +18,12 @@ type Config struct { // TableWorkers represents the number of concurrent workers per table. Each // worker will process a different page range in parallel. Defaults to 4. TableWorkers uint + // IgnoreRowProcessingErrors when set to true, will log errors encountered + // during row processing and continue instead of failing. Defaults to false. + IgnoreRowProcessingErrors bool + // LogRowOnError when set to true, will log the row data as JSON when a row + // processing error occurs. Defaults to false. + LogRowOnError bool } const ( diff --git a/pkg/snapshot/generator/postgres/data/pg_snapshot_generator.go b/pkg/snapshot/generator/postgres/data/pg_snapshot_generator.go index 8768c7a4..8ab01d16 100644 --- a/pkg/snapshot/generator/postgres/data/pg_snapshot_generator.go +++ b/pkg/snapshot/generator/postgres/data/pg_snapshot_generator.go @@ -10,6 +10,7 @@ import ( "sync" "github.com/jackc/pgx/v5/pgconn" + jsonlib "github.com/xataio/pgstream/internal/json" pglib "github.com/xataio/pgstream/internal/postgres" pglibinstrumentation "github.com/xataio/pgstream/internal/postgres/instrumentation" "github.com/xataio/pgstream/internal/progress" @@ -40,6 +41,9 @@ type SnapshotGenerator struct { progressTracking bool progressBars *synclib.StringMap[progress.Bar] progressBarBuilder func(totalBytes int64, description string) progress.Bar + + ignoreRowProcessingErrors bool + logRowOnError bool } type mapper interface { @@ -80,14 +84,16 @@ func NewSnapshotGenerator(ctx context.Context, cfg *Config, rowsProcessor snapsh } sg := &SnapshotGenerator{ - logger: loglib.NewNoopLogger(), - mapper: pglib.NewMapper(conn), - conn: conn, - rowsProcessor: rowsProcessor, - batchBytes: cfg.batchBytes(), - tableWorkers: cfg.tableWorkers(), - schemaWorkers: cfg.schemaWorkers(), - snapshotWorkers: cfg.snapshotWorkers(), + logger: loglib.NewNoopLogger(), + mapper: pglib.NewMapper(conn), + conn: conn, + rowsProcessor: rowsProcessor, + batchBytes: cfg.batchBytes(), + tableWorkers: cfg.tableWorkers(), + schemaWorkers: cfg.schemaWorkers(), + snapshotWorkers: cfg.snapshotWorkers(), + ignoreRowProcessingErrors: cfg.IgnoreRowProcessingErrors, + logRowOnError: cfg.LogRowOnError, } sg.tableSnapshotGenerator = sg.snapshotTable @@ -357,11 +363,30 @@ func (sg *SnapshotGenerator) snapshotTableRange(ctx context.Context, snapshotID continue } - if err := sg.rowsProcessor.ProcessRow(ctx, &snapshot.Row{ + row := &snapshot.Row{ Schema: table.schema, Table: table.name, Columns: columns, - }); err != nil { + } + if err := sg.rowsProcessor.ProcessRow(ctx, row); err != nil { + logFields := loglib.Fields{ + "schema": table.schema, + "table": table.name, + "error": err.Error(), + } + + if sg.logRowOnError { + if rowJSON, jsonErr := jsonlib.Marshal(row); jsonErr == nil { + logFields["row"] = string(rowJSON) + } + } + + sg.logger.Error(err, "processing snapshot row", logFields) + + if sg.ignoreRowProcessingErrors { + continue + } + return fmt.Errorf("processing snapshot row: %w", err) } } diff --git a/pkg/snapshot/generator/postgres/data/pg_snapshot_generator_test.go b/pkg/snapshot/generator/postgres/data/pg_snapshot_generator_test.go index 264d824e..4da085ac 100644 --- a/pkg/snapshot/generator/postgres/data/pg_snapshot_generator_test.go +++ b/pkg/snapshot/generator/postgres/data/pg_snapshot_generator_test.go @@ -6,6 +6,7 @@ import ( "context" "errors" "fmt" + "strings" "testing" "github.com/google/go-cmp/cmp" @@ -1245,3 +1246,482 @@ func TestTablePageInfo_calculateBatchPageSize(t *testing.T) { }) } } + +func TestSnapshotGenerator_snapshotTableRange(t *testing.T) { + t.Parallel() + + testTable := "test-table" + testSchema := "test-schema" + testSnapshotID := "test-snapshot-id" + testUUID := uuid.New().String() + quotedSchemaTable := pglib.QuoteQualifiedIdentifier(testSchema, testTable) + + testColumns := []snapshot.Column{ + {Name: "id", Type: "uuid", Value: testUUID}, + {Name: "name", Type: "text", Value: "alice"}, + } + + testRow := &snapshot.Row{ + Schema: testSchema, + Table: testTable, + Columns: testColumns, + } + + testPageRange := pageRange{start: 0, end: 5} + errTest := errors.New("test error") + + tests := []struct { + name string + querier pglib.Querier + table *table + pageRange pageRange + processor snapshot.RowsProcessor + + wantRows []*snapshot.Row + wantErr error + }{ + { + name: "ok - single row", + querier: &pgmocks.Querier{ + ExecInTxWithOptionsFn: func(_ context.Context, i uint, f func(tx pglib.Tx) error, to pglib.TxOptions) error { + mockTx := pgmocks.Tx{ + ExecFn: func(ctx context.Context, _ uint, query string, args ...any) (pglib.CommandTag, error) { + require.Equal(t, fmt.Sprintf("SET TRANSACTION SNAPSHOT '%s'", testSnapshotID), query) + return pglib.CommandTag{}, nil + }, + QueryFn: func(ctx context.Context, query string, args ...any) (pglib.Rows, error) { + require.Equal(t, fmt.Sprintf(pageRangeQuery, quotedSchemaTable, 0, 5), query) + return &pgmocks.Rows{ + CloseFn: func() {}, + NextFn: func(i uint) bool { return i == 1 }, + FieldDescriptionsFn: func() []pgconn.FieldDescription { + return []pgconn.FieldDescription{ + {Name: "id", DataTypeOID: pgtype.UUIDOID}, + {Name: "name", DataTypeOID: pgtype.TextOID}, + } + }, + ValuesFn: func() ([]any, error) { + return []any{testUUID, "alice"}, nil + }, + ErrFn: func() error { return nil }, + }, nil + }, + } + return f(&mockTx) + }, + }, + table: &table{ + schema: testSchema, + name: testTable, + rowSize: 512, + }, + pageRange: testPageRange, + processor: &mocks.RowsProcessor{ + ProcessRowFn: func(ctx context.Context, row *snapshot.Row) error { + return nil + }, + }, + wantRows: []*snapshot.Row{testRow}, + wantErr: nil, + }, + { + name: "ok - multiple rows", + querier: &pgmocks.Querier{ + ExecInTxWithOptionsFn: func(_ context.Context, i uint, f func(tx pglib.Tx) error, to pglib.TxOptions) error { + mockTx := pgmocks.Tx{ + ExecFn: func(ctx context.Context, _ uint, query string, args ...any) (pglib.CommandTag, error) { + require.Equal(t, fmt.Sprintf("SET TRANSACTION SNAPSHOT '%s'", testSnapshotID), query) + return pglib.CommandTag{}, nil + }, + QueryFn: func(ctx context.Context, query string, args ...any) (pglib.Rows, error) { + return &pgmocks.Rows{ + CloseFn: func() {}, + NextFn: func(i uint) bool { return i <= 2 }, + FieldDescriptionsFn: func() []pgconn.FieldDescription { + return []pgconn.FieldDescription{ + {Name: "id", DataTypeOID: pgtype.UUIDOID}, + {Name: "name", DataTypeOID: pgtype.TextOID}, + } + }, + ValuesFn: func() ([]any, error) { + return []any{testUUID, "alice"}, nil + }, + ErrFn: func() error { return nil }, + }, nil + }, + } + return f(&mockTx) + }, + }, + table: &table{ + schema: testSchema, + name: testTable, + rowSize: 512, + }, + pageRange: testPageRange, + processor: &mocks.RowsProcessor{ + ProcessRowFn: func(ctx context.Context, row *snapshot.Row) error { + return nil + }, + }, + wantRows: []*snapshot.Row{testRow, testRow}, + wantErr: nil, + }, + { + name: "ok - no rows", + querier: &pgmocks.Querier{ + ExecInTxWithOptionsFn: func(_ context.Context, i uint, f func(tx pglib.Tx) error, to pglib.TxOptions) error { + mockTx := pgmocks.Tx{ + ExecFn: func(ctx context.Context, _ uint, query string, args ...any) (pglib.CommandTag, error) { + require.Equal(t, fmt.Sprintf("SET TRANSACTION SNAPSHOT '%s'", testSnapshotID), query) + return pglib.CommandTag{}, nil + }, + QueryFn: func(ctx context.Context, query string, args ...any) (pglib.Rows, error) { + return &pgmocks.Rows{ + CloseFn: func() {}, + NextFn: func(i uint) bool { return false }, + FieldDescriptionsFn: func() []pgconn.FieldDescription { + return []pgconn.FieldDescription{} + }, + ValuesFn: func() ([]any, error) { + return []any{}, nil + }, + ErrFn: func() error { return nil }, + }, nil + }, + } + return f(&mockTx) + }, + }, + table: &table{ + schema: testSchema, + name: testTable, + rowSize: 512, + }, + pageRange: testPageRange, + processor: &mocks.RowsProcessor{ + ProcessRowFn: func(ctx context.Context, row *snapshot.Row) error { + return nil + }, + }, + wantRows: []*snapshot.Row{}, + wantErr: nil, + }, + { + name: "ok - with progress tracking", + querier: &pgmocks.Querier{ + ExecInTxWithOptionsFn: func(_ context.Context, i uint, f func(tx pglib.Tx) error, to pglib.TxOptions) error { + mockTx := pgmocks.Tx{ + ExecFn: func(ctx context.Context, _ uint, query string, args ...any) (pglib.CommandTag, error) { + require.Equal(t, fmt.Sprintf("SET TRANSACTION SNAPSHOT '%s'", testSnapshotID), query) + return pglib.CommandTag{}, nil + }, + QueryFn: func(ctx context.Context, query string, args ...any) (pglib.Rows, error) { + return &pgmocks.Rows{ + CloseFn: func() {}, + NextFn: func(i uint) bool { return i == 1 }, + FieldDescriptionsFn: func() []pgconn.FieldDescription { + return []pgconn.FieldDescription{ + {Name: "id", DataTypeOID: pgtype.UUIDOID}, + {Name: "name", DataTypeOID: pgtype.TextOID}, + } + }, + ValuesFn: func() ([]any, error) { + return []any{testUUID, "alice"}, nil + }, + ErrFn: func() error { return nil }, + }, nil + }, + } + return f(&mockTx) + }, + }, + table: &table{ + schema: testSchema, + name: testTable, + rowSize: 512, + }, + pageRange: testPageRange, + processor: &mocks.RowsProcessor{ + ProcessRowFn: func(ctx context.Context, row *snapshot.Row) error { + return nil + }, + }, + wantRows: []*snapshot.Row{testRow}, + wantErr: nil, + }, + { + name: "error - setting transaction snapshot", + querier: &pgmocks.Querier{ + ExecInTxWithOptionsFn: func(_ context.Context, i uint, f func(tx pglib.Tx) error, to pglib.TxOptions) error { + mockTx := pgmocks.Tx{ + ExecFn: func(ctx context.Context, _ uint, query string, args ...any) (pglib.CommandTag, error) { + require.Equal(t, fmt.Sprintf("SET TRANSACTION SNAPSHOT '%s'", testSnapshotID), query) + return pglib.CommandTag{}, errTest + }, + } + return f(&mockTx) + }, + }, + table: &table{ + schema: testSchema, + name: testTable, + rowSize: 512, + }, + pageRange: testPageRange, + processor: &mocks.RowsProcessor{ + ProcessRowFn: func(ctx context.Context, row *snapshot.Row) error { + return nil + }, + }, + wantRows: []*snapshot.Row{}, + wantErr: fmt.Errorf("setting transaction snapshot: %w", errTest), + }, + { + name: "error - querying table rows", + querier: &pgmocks.Querier{ + ExecInTxWithOptionsFn: func(_ context.Context, i uint, f func(tx pglib.Tx) error, to pglib.TxOptions) error { + mockTx := pgmocks.Tx{ + ExecFn: func(ctx context.Context, _ uint, query string, args ...any) (pglib.CommandTag, error) { + require.Equal(t, fmt.Sprintf("SET TRANSACTION SNAPSHOT '%s'", testSnapshotID), query) + return pglib.CommandTag{}, nil + }, + QueryFn: func(ctx context.Context, query string, args ...any) (pglib.Rows, error) { + return nil, errTest + }, + } + return f(&mockTx) + }, + }, + table: &table{ + schema: testSchema, + name: testTable, + rowSize: 512, + }, + pageRange: testPageRange, + processor: &mocks.RowsProcessor{ + ProcessRowFn: func(ctx context.Context, row *snapshot.Row) error { + return nil + }, + }, + wantRows: []*snapshot.Row{}, + wantErr: fmt.Errorf("querying table rows: %w", errTest), + }, + { + name: "error - retrieving row values", + querier: &pgmocks.Querier{ + ExecInTxWithOptionsFn: func(_ context.Context, i uint, f func(tx pglib.Tx) error, to pglib.TxOptions) error { + mockTx := pgmocks.Tx{ + ExecFn: func(ctx context.Context, _ uint, query string, args ...any) (pglib.CommandTag, error) { + require.Equal(t, fmt.Sprintf("SET TRANSACTION SNAPSHOT '%s'", testSnapshotID), query) + return pglib.CommandTag{}, nil + }, + QueryFn: func(ctx context.Context, query string, args ...any) (pglib.Rows, error) { + return &pgmocks.Rows{ + CloseFn: func() {}, + NextFn: func(i uint) bool { return i == 1 }, + FieldDescriptionsFn: func() []pgconn.FieldDescription { + return []pgconn.FieldDescription{} + }, + ValuesFn: func() ([]any, error) { + return nil, errTest + }, + ErrFn: func() error { return nil }, + }, nil + }, + } + return f(&mockTx) + }, + }, + table: &table{ + schema: testSchema, + name: testTable, + rowSize: 512, + }, + pageRange: testPageRange, + processor: &mocks.RowsProcessor{ + ProcessRowFn: func(ctx context.Context, row *snapshot.Row) error { + return nil + }, + }, + wantRows: []*snapshot.Row{}, + wantErr: fmt.Errorf("retrieving rows values: %w", errTest), + }, + { + name: "error - rows error", + querier: &pgmocks.Querier{ + ExecInTxWithOptionsFn: func(_ context.Context, i uint, f func(tx pglib.Tx) error, to pglib.TxOptions) error { + mockTx := pgmocks.Tx{ + ExecFn: func(ctx context.Context, _ uint, query string, args ...any) (pglib.CommandTag, error) { + require.Equal(t, fmt.Sprintf("SET TRANSACTION SNAPSHOT '%s'", testSnapshotID), query) + return pglib.CommandTag{}, nil + }, + QueryFn: func(ctx context.Context, query string, args ...any) (pglib.Rows, error) { + return &pgmocks.Rows{ + CloseFn: func() {}, + NextFn: func(i uint) bool { return false }, + FieldDescriptionsFn: func() []pgconn.FieldDescription { + return []pgconn.FieldDescription{} + }, + ValuesFn: func() ([]any, error) { + return []any{}, nil + }, + ErrFn: func() error { return errTest }, + }, nil + }, + } + return f(&mockTx) + }, + }, + table: &table{ + schema: testSchema, + name: testTable, + rowSize: 512, + }, + pageRange: testPageRange, + processor: &mocks.RowsProcessor{ + ProcessRowFn: func(ctx context.Context, row *snapshot.Row) error { + return nil + }, + }, + wantRows: []*snapshot.Row{}, + wantErr: errTest, + }, + { + name: "error - processing row fails and ignoreRowProcessingErrors is false", + querier: &pgmocks.Querier{ + ExecInTxWithOptionsFn: func(_ context.Context, i uint, f func(tx pglib.Tx) error, to pglib.TxOptions) error { + mockTx := pgmocks.Tx{ + ExecFn: func(ctx context.Context, _ uint, query string, args ...any) (pglib.CommandTag, error) { + require.Equal(t, fmt.Sprintf("SET TRANSACTION SNAPSHOT '%s'", testSnapshotID), query) + return pglib.CommandTag{}, nil + }, + QueryFn: func(ctx context.Context, query string, args ...any) (pglib.Rows, error) { + return &pgmocks.Rows{ + CloseFn: func() {}, + NextFn: func(i uint) bool { return i == 1 }, + FieldDescriptionsFn: func() []pgconn.FieldDescription { + return []pgconn.FieldDescription{ + {Name: "id", DataTypeOID: pgtype.UUIDOID}, + } + }, + ValuesFn: func() ([]any, error) { + return []any{testUUID}, nil + }, + ErrFn: func() error { return nil }, + }, nil + }, + } + return f(&mockTx) + }, + }, + table: &table{ + schema: testSchema, + name: testTable, + rowSize: 512, + }, + pageRange: testPageRange, + processor: &mocks.RowsProcessor{ + ProcessRowFn: func(ctx context.Context, row *snapshot.Row) error { + return errTest + }, + }, + wantRows: []*snapshot.Row{}, + wantErr: fmt.Errorf("processing snapshot row: %w", errTest), + }, + { + name: "ok - processing row fails but ignoreRowProcessingErrors is true", + querier: &pgmocks.Querier{ + ExecInTxWithOptionsFn: func(_ context.Context, i uint, f func(tx pglib.Tx) error, to pglib.TxOptions) error { + mockTx := pgmocks.Tx{ + ExecFn: func(ctx context.Context, _ uint, query string, args ...any) (pglib.CommandTag, error) { + require.Equal(t, fmt.Sprintf("SET TRANSACTION SNAPSHOT '%s'", testSnapshotID), query) + return pglib.CommandTag{}, nil + }, + QueryFn: func(ctx context.Context, query string, args ...any) (pglib.Rows, error) { + return &pgmocks.Rows{ + CloseFn: func() {}, + NextFn: func(i uint) bool { return i <= 2 }, + FieldDescriptionsFn: func() []pgconn.FieldDescription { + return []pgconn.FieldDescription{ + {Name: "id", DataTypeOID: pgtype.UUIDOID}, + } + }, + ValuesFn: func() ([]any, error) { + return []any{testUUID}, nil + }, + ErrFn: func() error { return nil }, + }, nil + }, + } + return f(&mockTx) + }, + }, + table: &table{ + schema: testSchema, + name: testTable, + rowSize: 512, + }, + pageRange: testPageRange, + processor: &mocks.RowsProcessor{ + ProcessRowFn: func(ctx context.Context, row *snapshot.Row) error { + return errTest + }, + }, + wantRows: []*snapshot.Row{}, + wantErr: nil, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + rowChan := make(chan *snapshot.Row, 10) + progressBar := &progressmocks.Bar{ + Add64Fn: func(n int64) error { + require.Equal(t, tc.table.rowSize, n) + return nil + }, + } + + sg := SnapshotGenerator{ + logger: zerolog.NewStdLogger(zerolog.NewLogger(&zerolog.Config{ + LogLevel: "debug", + })), + conn: tc.querier, + mapper: pglib.NewMapper(tc.querier), + rowsProcessor: &mocks.RowsProcessor{ + ProcessRowFn: func(ctx context.Context, row *snapshot.Row) error { + if tc.processor != nil { + if err := tc.processor.ProcessRow(ctx, row); err != nil { + return err + } + } + rowChan <- row + return nil + }, + }, + progressTracking: tc.name == "ok - with progress tracking", + progressBars: synclib.NewStringMap[progress.Bar](), + ignoreRowProcessingErrors: strings.Contains(tc.name, "ignoreRowProcessingErrors is true"), + logRowOnError: true, + } + + if sg.progressTracking { + sg.progressBars.Set(tc.table.schema, progressBar) + } + + err := sg.snapshotTableRange(context.Background(), testSnapshotID, tc.table, tc.pageRange) + require.Equal(t, tc.wantErr, err) + close(rowChan) + + rows := []*snapshot.Row{} + for row := range rowChan { + rows = append(rows, row) + } + diff := cmp.Diff(rows, tc.wantRows) + require.Empty(t, diff, fmt.Sprintf("got: \n%v, \nwant \n%v, \ndiff: \n%s", rows, tc.wantRows, diff)) + }) + } +}