diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 7ae1b6e5ed..351fca11eb 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -19,7 +19,6 @@ import ( "go.temporal.io/sdk/client" "go.temporal.io/sdk/log" "golang.org/x/sync/errgroup" - "google.golang.org/protobuf/proto" "github.com/PeerDB-io/peerdb/flow/alerting" "github.com/PeerDB-io/peerdb/flow/connectors" @@ -60,7 +59,7 @@ func (a *FlowableActivity) Alert( ctx context.Context, alert *protos.AlertInput, ) error { - _ = a.Alerter.LogFlowError(ctx, alert.FlowName, errors.New(alert.Message)) + _ = a.Alerter.LogFlowErrorNoStatus(ctx, alert.FlowName, errors.New(alert.Message)) return nil } @@ -74,7 +73,7 @@ func (a *FlowableActivity) CheckConnection( if errors.Is(err, errors.ErrUnsupported) { return nil } - return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to get connector: %w", err)) + return a.Alerter.LogFlowErrorNoStatus(ctx, config.FlowName, fmt.Errorf("failed to get connector: %w", err)) } defer connectors.CloseConnector(ctx, conn) @@ -88,7 +87,7 @@ func (a *FlowableActivity) CheckMetadataTables( ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName) conn, err := connectors.GetByNameAs[connectors.CDCSyncConnector](ctx, config.Env, a.CatalogPool, config.PeerName) if err != nil { - return nil, a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to get connector: %w", err)) + return nil, a.Alerter.LogFlowErrorNoStatus(ctx, config.FlowName, fmt.Errorf("failed to get connector: %w", err)) } defer connectors.CloseConnector(ctx, conn) @@ -106,12 +105,12 @@ func (a *FlowableActivity) SetupMetadataTables(ctx context.Context, config *prot ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName) dstConn, err := connectors.GetByNameAs[connectors.CDCSyncConnector](ctx, config.Env, a.CatalogPool, config.PeerName) if err != nil { - return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to get connector: %w", err)) + return a.Alerter.LogFlowErrorNoStatus(ctx, config.FlowName, fmt.Errorf("failed to get connector: %w", err)) } defer connectors.CloseConnector(ctx, dstConn) if err := dstConn.SetupMetadataTables(ctx); err != nil { - return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to setup metadata tables: %w", err)) + return a.Alerter.LogFlowErrorNoStatus(ctx, config.FlowName, fmt.Errorf("failed to setup metadata tables: %w", err)) } return nil @@ -124,13 +123,13 @@ func (a *FlowableActivity) EnsurePullability( ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, nil, a.CatalogPool, config.PeerName) if err != nil { - return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to get connector: %w", err)) + return nil, a.Alerter.LogFlowErrorNoStatus(ctx, config.FlowJobName, fmt.Errorf("failed to get connector: %w", err)) } defer connectors.CloseConnector(ctx, srcConn) output, err := srcConn.EnsurePullability(ctx, config) if err != nil { - return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to ensure pullability: %w", err)) + return nil, a.Alerter.LogFlowErrorNoStatus(ctx, config.FlowJobName, fmt.Errorf("failed to ensure pullability: %w", err)) } return output, nil @@ -144,13 +143,13 @@ func (a *FlowableActivity) CreateRawTable( ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) dstConn, err := connectors.GetByNameAs[connectors.CDCSyncConnector](ctx, nil, a.CatalogPool, config.PeerName) if err != nil { - return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to get connector: %w", err)) + return nil, a.Alerter.LogFlowErrorNoStatus(ctx, config.FlowJobName, fmt.Errorf("failed to get connector: %w", err)) } defer connectors.CloseConnector(ctx, dstConn) res, err := dstConn.CreateRawTable(ctx, config) if err != nil { - return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + return nil, a.Alerter.LogFlowErrorNoStatus(ctx, config.FlowJobName, err) } if err := monitoring.InitializeCDCFlow(ctx, a.CatalogPool, config.FlowJobName); err != nil { return nil, err @@ -159,9 +158,42 @@ func (a *FlowableActivity) CreateRawTable( return res, nil } +func (a *FlowableActivity) UpdateFlowStatusInCatalogActivity( + ctx context.Context, + workflowID string, + status protos.FlowStatus, +) (protos.FlowStatus, error) { + return internal.UpdateFlowStatusInCatalog(ctx, a.CatalogPool, workflowID, status) +} + +func (a *FlowableActivity) SetupTableSchemaActivity( + ctx context.Context, + config *protos.SetupTableSchemaBatchInput, +) error { + if err := setupTableSchema(ctx, a.CatalogPool, config); err != nil { + setupErr := fmt.Errorf("failed to setup table schema: %w", err) + return a.Alerter.LogFlowErrorNoStatus(ctx, config.FlowName, setupErr) + } + return nil +} + +func (a *FlowableActivity) SetupQRepTableSchemaActivity( + ctx context.Context, + config *protos.SetupTableSchemaBatchInput, + snapshotID int32, + runUUID string, +) error { + if err := setupTableSchema(ctx, a.CatalogPool, config); err != nil { + setupErr := fmt.Errorf("failed to setup table schema: %w", err) + return a.Alerter.LogFlowSnapshotQRepError(ctx, config.FlowName, snapshotID, runUUID, setupErr) + } + return nil +} + // SetupTableSchema populates table_schema_mapping -func (a *FlowableActivity) SetupTableSchema( +func setupTableSchema( ctx context.Context, + pool shared.CatalogPool, config *protos.SetupTableSchemaBatchInput, ) error { shutdown := heartbeatRoutine(ctx, func() string { @@ -171,26 +203,26 @@ func (a *FlowableActivity) SetupTableSchema( logger := internal.LoggerFromCtx(ctx) ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName) - srcConn, err := connectors.GetByNameAs[connectors.GetTableSchemaConnector](ctx, config.Env, a.CatalogPool, config.PeerName) + srcConn, err := connectors.GetByNameAs[connectors.GetTableSchemaConnector](ctx, config.Env, pool, config.PeerName) if err != nil { - return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to get GetTableSchemaConnector: %w", err)) + return fmt.Errorf("failed to get table schema connector: %w", err) } defer connectors.CloseConnector(ctx, srcConn) tableNameSchemaMapping, err := srcConn.GetTableSchema(ctx, config.Env, config.Version, config.System, config.TableMappings) if err != nil { - return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to get GetTableSchemaConnector: %w", err)) + return fmt.Errorf("failed to get table schema: %w", err) } processed := internal.BuildProcessedSchemaMapping(config.TableMappings, tableNameSchemaMapping, logger) - tx, err := a.CatalogPool.BeginTx(ctx, pgx.TxOptions{}) + tx, err := pool.BeginTx(ctx, pgx.TxOptions{}) if err != nil { return err } defer shared.RollbackTx(tx, logger) for k, v := range processed { - processedBytes, err := proto.Marshal(v) + processedBytes, err := internal.ProtoMarshal(v) if err != nil { return err } @@ -202,7 +234,7 @@ func (a *FlowableActivity) SetupTableSchema( k, processedBytes, ); err != nil { - return err + return fmt.Errorf("failed to set table schema: %w", err) } } @@ -210,9 +242,38 @@ func (a *FlowableActivity) SetupTableSchema( } // CreateNormalizedTable creates normalized tables in destination. -func (a *FlowableActivity) CreateNormalizedTable( +func (a *FlowableActivity) CreateSetupNormalizedTable( ctx context.Context, config *protos.SetupNormalizedTableBatchInput, +) (*protos.SetupNormalizedTableBatchOutput, error) { + res, err := createNormalizedTable(ctx, a.CatalogPool, config, a.Alerter.LogFlowInfo) + if err != nil { + createErr := fmt.Errorf("failed to create normalized table: %w", err) + return nil, a.Alerter.LogFlowErrorNoStatus(ctx, config.FlowName, createErr) + } + return res, nil +} + +// CreateQRepNormalizedTable creates normalized tables in destination. +func (a *FlowableActivity) CreateQRepNormalizedTable( + ctx context.Context, + config *protos.SetupNormalizedTableBatchInput, + snapshotID int32, + runUUID string, +) (*protos.SetupNormalizedTableBatchOutput, error) { + res, err := createNormalizedTable(ctx, a.CatalogPool, config, a.Alerter.LogFlowInfo) + if err != nil { + createErr := fmt.Errorf("failed to create normalized table: %w", err) + return nil, a.Alerter.LogFlowSnapshotQRepError(ctx, config.FlowName, snapshotID, runUUID, createErr) + } + return res, nil +} + +func createNormalizedTable( + ctx context.Context, + pool shared.CatalogPool, + config *protos.SetupNormalizedTableBatchInput, + logFlowInfo func(ctx context.Context, flowName string, info string), ) (*protos.SetupNormalizedTableBatchOutput, error) { numTablesSetup := atomic.Uint32{} numTablesToSetup := atomic.Int32{} @@ -224,14 +285,14 @@ func (a *FlowableActivity) CreateNormalizedTable( logger := internal.LoggerFromCtx(ctx) ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName) - a.Alerter.LogFlowInfo(ctx, config.FlowName, "Setting up destination tables") - conn, err := connectors.GetByNameAs[connectors.NormalizedTablesConnector](ctx, config.Env, a.CatalogPool, config.PeerName) + logFlowInfo(ctx, config.FlowName, "Setting up destination tables") + conn, err := connectors.GetByNameAs[connectors.NormalizedTablesConnector](ctx, config.Env, pool, config.PeerName) if err != nil { if errors.Is(err, errors.ErrUnsupported) { logger.Info("Connector does not implement normalized tables") return nil, nil } - return nil, a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to get connector: %w", err)) + return nil, fmt.Errorf("failed to get connector: %w", err) } defer connectors.CloseConnector(ctx, conn) @@ -241,7 +302,7 @@ func (a *FlowableActivity) CreateNormalizedTable( } defer conn.CleanupSetupNormalizedTables(ctx, tx) - tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, config.FlowName) + tableNameSchemaMapping, err := getTableNameSchemaMapping(ctx, pool, config.FlowName) if err != nil { return nil, err } @@ -259,15 +320,13 @@ func (a *FlowableActivity) CreateNormalizedTable( tableSchema, ) if err != nil { - return nil, a.Alerter.LogFlowError(ctx, config.FlowName, - fmt.Errorf("failed to setup normalized table %s: %w", tableIdentifier, err), - ) + return nil, fmt.Errorf("failed to setup normalized table %s: %w", tableIdentifier, err) } tableExistsMapping[tableIdentifier] = existing numTablesSetup.Add(1) if !existing { - a.Alerter.LogFlowInfo(ctx, config.FlowName, "created table "+tableIdentifier+" in destination") + logFlowInfo(ctx, config.FlowName, "created table "+tableIdentifier+" in destination") } else { logger.Info("table already exists " + tableIdentifier) } @@ -276,8 +335,7 @@ func (a *FlowableActivity) CreateNormalizedTable( if err := conn.FinishSetupNormalizedTables(ctx, tx); err != nil { return nil, fmt.Errorf("failed to commit normalized tables tx: %w", err) } - - a.Alerter.LogFlowInfo(ctx, config.FlowName, "All destination tables have been setup") + logFlowInfo(ctx, config.FlowName, "All destination tables have been setup") return &protos.SetupNormalizedTableBatchOutput{ TableExistsMapping: tableExistsMapping, @@ -319,24 +377,28 @@ func (a *FlowableActivity) SyncFlow( srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnectorCore](ctx, config.Env, a.CatalogPool, config.SourceName) if err != nil { - return a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + getErr := fmt.Errorf("failed to get pull connector: %w", err) + return a.Alerter.LogFlowSyncError(ctx, config.FlowJobName, syncingBatchID.Load(), getErr) } if err := srcConn.SetupReplConn(ctx); err != nil { connectors.CloseConnector(ctx, srcConn) - return a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + replConnErr := fmt.Errorf("failed to setup repl connection: %w", err) + return a.Alerter.LogFlowSyncError(ctx, config.FlowJobName, syncingBatchID.Load(), replConnErr) } normalizeBufferSize, err := internal.PeerDBNormalizeChannelBufferSize(ctx, config.Env) if err != nil { connectors.CloseConnector(ctx, srcConn) - return a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + cfgErr := fmt.Errorf("failed to get normalize channel buffer size config: %w", err) + return a.Alerter.LogFlowSyncError(ctx, config.FlowJobName, syncingBatchID.Load(), cfgErr) } reconnectAfterBatches, err := internal.PeerDBReconnectAfterBatches(ctx, config.Env) if err != nil { connectors.CloseConnector(ctx, srcConn) - return a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + cfgErr := fmt.Errorf("failed to get reconnect after batches config: %w", err) + return a.Alerter.LogFlowSyncError(ctx, config.FlowJobName, syncingBatchID.Load(), cfgErr) } // syncDone will be closed by SyncFlow, @@ -354,8 +416,8 @@ func (a *FlowableActivity) SyncFlow( }) group.Go(func() error { defer connectors.CloseConnector(groupCtx, srcConn) - if err := a.maintainReplConn(groupCtx, config.FlowJobName, srcConn, syncDone); err != nil { - return a.Alerter.LogFlowError(groupCtx, config.FlowJobName, err) + if err := a.maintainReplConn(groupCtx, srcConn, syncDone); err != nil { + return a.Alerter.LogFlowErrorNoStatus(groupCtx, config.FlowJobName, err) } return nil }) @@ -365,21 +427,27 @@ func (a *FlowableActivity) SyncFlow( logger.Info("executing sync flow", slog.Int64("count", int64(syncNum))) var syncResponse *model.SyncResponse - var syncErr error + var err error if config.System == protos.TypeSystem_Q { - syncResponse, syncErr = a.syncRecords(groupCtx, config, options, srcConn.(connectors.CDCPullConnector), + syncResponse, err = a.syncRecords(groupCtx, config, options, srcConn.(connectors.CDCPullConnector), normRequests, &syncingBatchID, &syncState) } else { - syncResponse, syncErr = a.syncPg(groupCtx, config, options, srcConn.(connectors.CDCPullPgConnector), + syncResponse, err = a.syncPg(groupCtx, config, options, srcConn.(connectors.CDCPullPgConnector), normRequests, &syncingBatchID, &syncState) } - if syncErr != nil { + if err != nil { if groupCtx.Err() != nil { // need to return ctx.Err(), avoid returning syncErr that's wrapped context canceled break } - logger.Error("failed to sync records", slog.Any("error", syncErr)) + syncErr := fmt.Errorf("failed to sync records: %w", err) + var skipLogFlowError *exceptions.SkipLogFlowError + if !errors.As(syncErr, &skipLogFlowError) { + _ = a.Alerter.LogFlowSyncError(ctx, config.FlowJobName, syncingBatchID.Load(), syncErr) + } else { + logger.Error(syncErr.Error()) + } syncState.Store(shared.Ptr("cleanup")) close(syncDone) return errors.Join(syncErr, group.Wait()) @@ -428,7 +496,7 @@ func (a *FlowableActivity) syncRecords( a.Alerter.LogFlowInfo(ctx, config.FlowJobName, s) })) if err != nil { - return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + return nil, fmt.Errorf("failed to load script: %w", err) } if fn, ok := ls.Env.RawGetString("transformRecord").(*lua.LFunction); ok { return pua.AttachToCdcStream(ctx, ls, fn, stream, onErr), nil @@ -470,15 +538,17 @@ func (a *FlowableActivity) syncPg( } // SetupQRepMetadataTables sets up the metadata tables for QReplication. -func (a *FlowableActivity) SetupQRepMetadataTables(ctx context.Context, config *protos.QRepConfig) error { +func (a *FlowableActivity) SetupQRepMetadataTables(ctx context.Context, config *protos.QRepConfig, runUUID string) error { conn, err := connectors.GetByNameAs[connectors.QRepSyncConnector](ctx, config.Env, a.CatalogPool, config.DestinationName) if err != nil { - return a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to get connector: %w", err)) + getErr := fmt.Errorf("failed to get connector: %w", err) + return a.Alerter.LogFlowSnapshotQRepError(ctx, config.FlowJobName, config.SnapshotId, runUUID, getErr) } defer connectors.CloseConnector(ctx, conn) if err := conn.SetupQRepMetadataTables(ctx, config); err != nil { - return a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to setup metadata tables: %w", err)) + setupErr := fmt.Errorf("failed to setup metadata tables: %w", err) + return a.Alerter.LogFlowSnapshotQRepError(ctx, config.FlowJobName, config.SnapshotId, runUUID, setupErr) } return nil @@ -502,13 +572,15 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, } srcConn, err := connectors.GetByNameAs[connectors.QRepPullConnector](ctx, config.Env, a.CatalogPool, config.SourceName) if err != nil { - return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to get qrep pull connector: %w", err)) + getErr := fmt.Errorf("failed to get qrep pull connector: %w", err) + return nil, a.Alerter.LogFlowSnapshotQRepError(ctx, config.FlowJobName, config.SnapshotId, runUUID, getErr) } defer connectors.CloseConnector(ctx, srcConn) partitions, err := srcConn.GetQRepPartitions(ctx, config, last) if err != nil { - return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to get partitions from source: %w", err)) + getErr := fmt.Errorf("failed to get partitions from source: %w", err) + return nil, a.Alerter.LogFlowSnapshotQRepError(ctx, config.FlowJobName, config.SnapshotId, runUUID, getErr) } if len(partitions) > 0 { if err := monitoring.InitializeQRepRun( @@ -520,7 +592,8 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, partitions, config.ParentMirrorName, ); err != nil { - return nil, err + initErr := fmt.Errorf("failed to initialize qrep run: %w", err) + return nil, a.Alerter.LogFlowSnapshotQRepError(ctx, config.FlowJobName, config.SnapshotId, runUUID, initErr) } } @@ -547,7 +620,11 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context, err := monitoring.UpdateStartTimeForQRepRun(ctx, a.CatalogPool, runUUID) if err != nil { - return fmt.Errorf("failed to update start time for qrep run: %w", err) + // Ideally we should have granular status for partition batches. + // However, those are identified by run uuid + batch id, need to be stored in an array, + // and this is the only place a failure can happen on a batch level. + // Let's assume a catalog failure will bubble up elsewhere too. + return shared.LogError(logger, fmt.Errorf("failed to update start time for qrep run: %w", err)) } numPartitions := len(partitions.Partitions) @@ -566,7 +643,10 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context, a.Alerter.LogFlowInfo(ctx, config.FlowJobName, s) })) if err != nil { - return a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + loadErr := fmt.Errorf("failed to load script: %w", err) + return a.Alerter.LogFlowSnapshotPartitionError( + ctx, config.FlowJobName, config.SnapshotId, p.PartitionId, loadErr, + ) } if fn, ok := ls.Env.RawGetString("transformRow").(*lua.LFunction); ok { outstream = pua.AttachToStream(ls, fn, stream) @@ -587,8 +667,10 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context, } if err != nil { - logger.Error("failed to replicate partition", slog.Any("error", err)) - return a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + replErr := fmt.Errorf("failed to replicate partition: %w", err) + return a.Alerter.LogFlowSnapshotPartitionError( + ctx, config.FlowJobName, config.SnapshotId, p.PartitionId, replErr, + ) } } @@ -607,47 +689,77 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) dstConn, err := connectors.GetByNameAs[connectors.QRepConsolidateConnector](ctx, config.Env, a.CatalogPool, config.DestinationName) if errors.Is(err, errors.ErrUnsupported) { - return monitoring.UpdateEndTimeForQRepRun(ctx, a.CatalogPool, runUUID) + if err := monitoring.UpdateEndTimeForQRepRun(ctx, a.CatalogPool, runUUID); err != nil { + updateErr := fmt.Errorf("failed to update end time for qrep run: %w", err) + return a.Alerter.LogFlowSnapshotQRepError(ctx, config.FlowJobName, config.SnapshotId, runUUID, updateErr) + } + return nil } else if err != nil { - return err + getErr := fmt.Errorf("failed to get consolidate connector for qrep run: %w", err) + return a.Alerter.LogFlowSnapshotQRepError(ctx, config.FlowJobName, config.SnapshotId, runUUID, getErr) } defer connectors.CloseConnector(ctx, dstConn) if err := dstConn.ConsolidateQRepPartitions(ctx, config); err != nil { - return a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + consErr := fmt.Errorf("failed to consolidate partitions for qrep run: %w", err) + return a.Alerter.LogFlowSnapshotQRepError(ctx, config.FlowJobName, config.SnapshotId, runUUID, consErr) } - return monitoring.UpdateEndTimeForQRepRun(ctx, a.CatalogPool, runUUID) + if err := monitoring.UpdateEndTimeForQRepRun(ctx, a.CatalogPool, runUUID); err != nil { + updateErr := fmt.Errorf("failed to update end time for qrep run: %w", err) + return a.Alerter.LogFlowSnapshotQRepError(ctx, config.FlowJobName, config.SnapshotId, runUUID, updateErr) + } + return nil } -func (a *FlowableActivity) CleanupQRepFlow(ctx context.Context, config *protos.QRepConfig) error { +func (a *FlowableActivity) CleanupQRepFlow(ctx context.Context, config *protos.QRepConfig, runUUID string) error { ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) dst, err := connectors.GetByNameAs[connectors.QRepConsolidateConnector](ctx, config.Env, a.CatalogPool, config.DestinationName) if errors.Is(err, errors.ErrUnsupported) { return nil } else if err != nil { - return a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + getErr := fmt.Errorf("failed to get consolidate connector for qrep run: %w", err) + return a.Alerter.LogFlowSnapshotQRepError(ctx, config.FlowJobName, config.SnapshotId, runUUID, getErr) } defer connectors.CloseConnector(ctx, dst) - return dst.CleanupQRepFlow(ctx, config) + if err := dst.CleanupQRepFlow(ctx, config); err != nil { + cleanupErr := fmt.Errorf("failed to cleanup qrep flow: %w", err) + return a.Alerter.LogFlowSnapshotQRepError(ctx, config.FlowJobName, config.SnapshotId, runUUID, cleanupErr) + } + return nil +} + +func (a *FlowableActivity) UpdateQRepStatusSuccess( + ctx context.Context, config *protos.QRepConfig, runUUID string, +) error { + ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) + if err := monitoring.UpdateQRepStatusSuccess( + ctx, a.CatalogPool, config.FlowJobName, config.SnapshotId, runUUID, + ); err != nil { + updateErr := fmt.Errorf("failed to update qrep status: %w", err) + return a.Alerter.LogFlowSnapshotQRepError(ctx, config.FlowJobName, config.SnapshotId, runUUID, updateErr) + } + return nil } func (a *FlowableActivity) DropFlowSource(ctx context.Context, req *protos.DropFlowActivityInput) error { ctx = context.WithValue(ctx, shared.FlowNameKey, req.FlowJobName) srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, nil, a.CatalogPool, req.PeerName) if err != nil { - return a.Alerter.LogFlowError(ctx, req.FlowJobName, - exceptions.NewDropFlowError(fmt.Errorf("[DropFlowSource] failed to get source connector: %w", err)), - ) + getErr := exceptions.NewDropFlowError("[DropFlowSource] failed to get source connector: %w", err) + return a.Alerter.LogFlowErrorNoStatus(ctx, req.FlowJobName, getErr) } defer connectors.CloseConnector(ctx, srcConn) if err := srcConn.PullFlowCleanup(ctx, req.FlowJobName); err != nil { - pullCleanupErr := exceptions.NewDropFlowError(fmt.Errorf("[DropFlowSource] failed to clean up source: %w", err)) + pullCleanupErr := exceptions.NewDropFlowError("[DropFlowSource] failed to clean up source: %w", err) if !shared.IsSQLStateError(err, pgerrcode.ObjectInUse) { // don't alert when PID active - _ = a.Alerter.LogFlowError(ctx, req.FlowJobName, pullCleanupErr) + _ = a.Alerter.LogFlowErrorNoStatus(ctx, req.FlowJobName, pullCleanupErr) + } else { + logger := internal.LoggerFromCtx(ctx) + logger.Error(pullCleanupErr.Error()) } return pullCleanupErr } @@ -661,16 +773,14 @@ func (a *FlowableActivity) DropFlowDestination(ctx context.Context, req *protos. ctx = context.WithValue(ctx, shared.FlowNameKey, req.FlowJobName) dstConn, err := connectors.GetByNameAs[connectors.CDCSyncConnector](ctx, nil, a.CatalogPool, req.PeerName) if err != nil { - return a.Alerter.LogFlowError(ctx, req.FlowJobName, - exceptions.NewDropFlowError(fmt.Errorf("[DropFlowDestination] failed to get destination connector: %w", err)), - ) + getErr := exceptions.NewDropFlowError("[DropFlowDestination] failed to get destination connector: %w", err) + return a.Alerter.LogFlowErrorNoStatus(ctx, req.FlowJobName, getErr) } defer connectors.CloseConnector(ctx, dstConn) if err := dstConn.SyncFlowCleanup(ctx, req.FlowJobName); err != nil { - return a.Alerter.LogFlowError(ctx, req.FlowJobName, - exceptions.NewDropFlowError(fmt.Errorf("[DropFlowDestination] failed to clean up destination: %w", err)), - ) + cleanupErr := exceptions.NewDropFlowError("[DropFlowDestination] failed to clean up destination: %w", err) + return a.Alerter.LogFlowErrorNoStatus(ctx, req.FlowJobName, cleanupErr) } a.Alerter.LogFlowInfo(ctx, req.FlowJobName, @@ -772,7 +882,7 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error { } var config protos.FlowConnectionConfigs - if err := proto.Unmarshal(configProto, &config); err != nil { + if err := internal.ProtoUnmarshal(configProto, &config); err != nil { return nil, err } @@ -947,7 +1057,8 @@ func (a *FlowableActivity) QRepHasNewRows(ctx context.Context, if errors.Is(err, errors.ErrUnsupported) { return true, nil } - return false, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to get qrep source connector: %w", err)) + getErr := fmt.Errorf("failed to get qrep source connector: %w", err) + return false, a.Alerter.LogFlowErrorNoStatus(ctx, config.FlowJobName, getErr) } defer connectors.CloseConnector(ctx, srcConn) @@ -955,7 +1066,8 @@ func (a *FlowableActivity) QRepHasNewRows(ctx context.Context, maxValue, err := srcConn.GetMaxValue(ctx, config, last) if err != nil { - return false, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to check for new rows: %w", err)) + checkErr := fmt.Errorf("failed to check for new rows: %w", err) + return false, a.Alerter.LogFlowErrorNoStatus(ctx, config.FlowJobName, checkErr) } if maxValue == nil || last == nil || last.Range == nil { @@ -983,15 +1095,37 @@ func (a *FlowableActivity) QRepHasNewRows(ctx context.Context, } func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) { + res, err := renameTables(ctx, config, a.CatalogPool) + if err != nil { + renameErr := fmt.Errorf("failed to rename tables: %w", err) + return nil, a.Alerter.LogFlowErrorNoStatus(ctx, config.FlowJobName, renameErr) + } + a.Alerter.LogFlowInfo(ctx, config.FlowJobName, "Resync completed for all tables") + return res, nil +} + +func (a *FlowableActivity) QRepRenameTables( + ctx context.Context, config *protos.RenameTablesInput, snapshotID int32, runUUID string, +) (*protos.RenameTablesOutput, error) { + res, err := renameTables(ctx, config, a.CatalogPool) + if err != nil { + renameErr := fmt.Errorf("failed to rename tables: %w", err) + return nil, a.Alerter.LogFlowSnapshotQRepError(ctx, config.FlowJobName, snapshotID, runUUID, renameErr) + } + a.Alerter.LogFlowInfo(ctx, config.FlowJobName, "Resync completed for all tables") + return res, nil +} + +func renameTables(ctx context.Context, config *protos.RenameTablesInput, pool shared.CatalogPool) (*protos.RenameTablesOutput, error) { shutdown := heartbeatRoutine(ctx, func() string { return "renaming tables for job" }) defer shutdown() ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) - conn, err := connectors.GetByNameAs[connectors.RenameTablesConnector](ctx, nil, a.CatalogPool, config.PeerName) + conn, err := connectors.GetByNameAs[connectors.RenameTablesConnector](ctx, nil, pool, config.PeerName) if err != nil { - return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to get connector: %w", err)) + return nil, fmt.Errorf("failed to get connector: %w", err) } defer connectors.CloseConnector(ctx, conn) @@ -999,7 +1133,7 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena for _, option := range config.RenameTableOptions { schema, err := internal.LoadTableSchemaFromCatalog( ctx, - a.CatalogPool, + pool, config.FlowJobName, option.CurrentName, ) @@ -1011,10 +1145,10 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena renameOutput, err := conn.RenameTables(ctx, config, tableNameSchemaMapping) if err != nil { - return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to rename tables: %w", err)) + return nil, fmt.Errorf("failed to rename tables: %w", err) } - tx, err := a.CatalogPool.Begin(ctx) + tx, err := pool.Begin(ctx) if err != nil { return nil, fmt.Errorf("failed to begin updating table_schema_mapping: %w", err) } @@ -1043,8 +1177,6 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena } } - a.Alerter.LogFlowInfo(ctx, config.FlowJobName, "Resync completed for all tables") - return renameOutput, tx.Commit(ctx) } @@ -1070,11 +1202,17 @@ func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *pr ctx = context.WithValue(ctx, shared.FlowNameKey, req.FlowJobName) dstConn, err := connectors.GetByNameAs[connectors.CreateTablesFromExistingConnector](ctx, nil, a.CatalogPool, req.PeerName) if err != nil { - return nil, a.Alerter.LogFlowError(ctx, req.FlowJobName, fmt.Errorf("failed to get connector: %w", err)) + getErr := fmt.Errorf("failed to get connector: %w", err) + return nil, a.Alerter.LogFlowSnapshotQRepError(ctx, req.FlowJobName, req.SnapshotId, req.RunUuid, getErr) } defer connectors.CloseConnector(ctx, dstConn) - return dstConn.CreateTablesFromExisting(ctx, req) + output, err := dstConn.CreateTablesFromExisting(ctx, req) + if err != nil { + createErr := fmt.Errorf("failed to create tables: %w", err) + return nil, a.Alerter.LogFlowSnapshotQRepError(ctx, req.FlowJobName, req.SnapshotId, req.RunUuid, createErr) + } + return output, nil } func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, @@ -1087,22 +1225,30 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, }) defer shutdown() + var lastPartition int64 + var err error switch config.System { case protos.TypeSystem_Q: stream := model.NewQRecordStream(shared.FetchAndChannelSize) - return replicateXminPartition(ctx, a, config, partition, runUUID, + lastPartition, err = replicateXminPartition(ctx, a, config, partition, runUUID, stream, stream, (*connpostgres.PostgresConnector).PullXminRecordStream, connectors.QRepSyncConnector.SyncQRepRecords) case protos.TypeSystem_PG: pgread, pgwrite := connpostgres.NewPgCopyPipe() - return replicateXminPartition(ctx, a, config, partition, runUUID, + lastPartition, err = replicateXminPartition(ctx, a, config, partition, runUUID, pgwrite, pgread, (*connpostgres.PostgresConnector).PullXminPgRecordStream, connectors.QRepSyncPgConnector.SyncPgQRepRecords) default: return 0, fmt.Errorf("unknown type system %d", config.System) } + + if err != nil { + replErr := fmt.Errorf("failed to replicate partition: %w", err) + return 0, a.Alerter.LogFlowSnapshotPartitionError(ctx, config.FlowJobName, config.SnapshotId, partition.PartitionId, replErr) + } + return lastPartition, nil } func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, cfg *protos.FlowConnectionConfigs, @@ -1123,7 +1269,7 @@ func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, cfg *prot PublicationName: cfg.PublicationName, AdditionalTables: additionalTableMappings, }); err != nil { - return a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err) + return a.Alerter.LogFlowErrorNoStatus(ctx, cfg.FlowJobName, err) } a.Alerter.LogFlowInfo(ctx, cfg.FlowJobName, fmt.Sprintf("ensured %d tables exist in publication %s", @@ -1142,7 +1288,7 @@ func (a *FlowableActivity) RemoveTablesFromPublication( if errors.Is(err, errors.ErrUnsupported) { return nil } - return a.Alerter.LogFlowError(ctx, cfg.FlowJobName, fmt.Errorf("failed to get source connector: %w", err)) + return a.Alerter.LogFlowErrorNoStatus(ctx, cfg.FlowJobName, fmt.Errorf("failed to get source connector: %w", err)) } defer connectors.CloseConnector(ctx, srcConn) @@ -1151,7 +1297,7 @@ func (a *FlowableActivity) RemoveTablesFromPublication( PublicationName: cfg.PublicationName, TablesToRemove: removedTablesMapping, }); err != nil { - return a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err) + return a.Alerter.LogFlowErrorNoStatus(ctx, cfg.FlowJobName, err) } a.Alerter.LogFlowInfo(ctx, cfg.FlowJobName, fmt.Sprintf("removed %d tables from publication %s", @@ -1186,7 +1332,7 @@ func (a *FlowableActivity) RemoveTablesFromRawTable( // we can ignore the error return nil } - return a.Alerter.LogFlowError(ctx, cfg.FlowJobName, + return a.Alerter.LogFlowErrorNoStatus(ctx, cfg.FlowJobName, fmt.Errorf("[RemoveTablesFromRawTable] failed to get destination connector: %w", err), ) } @@ -1202,7 +1348,7 @@ func (a *FlowableActivity) RemoveTablesFromRawTable( SyncBatchId: syncBatchID, NormalizeBatchId: normBatchID, }); err != nil { - return a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err) + return a.Alerter.LogFlowErrorNoStatus(ctx, cfg.FlowJobName, err) } return nil } @@ -1279,7 +1425,7 @@ func (a *FlowableActivity) GetFlowMetadata( logger := log.With(internal.LoggerFromCtx(ctx), slog.String(string(shared.FlowNameKey), input.FlowName)) peerTypes, err := connectors.LoadPeerTypes(ctx, a.CatalogPool, []string{input.SourceName, input.DestinationName}) if err != nil { - return nil, a.Alerter.LogFlowError(ctx, input.FlowName, err) + return nil, a.Alerter.LogFlowErrorNoStatus(ctx, input.FlowName, err) } logger.Debug("loaded peer types for flow", slog.String("flowName", input.FlowName), slog.String("sourceName", input.SourceName), slog.String("destinationName", input.DestinationName), diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index cb622b8484..79844745b3 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -18,7 +18,6 @@ import ( "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" "golang.org/x/sync/errgroup" - "google.golang.org/protobuf/proto" "github.com/PeerDB-io/peerdb/flow/connectors" connmysql "github.com/PeerDB-io/peerdb/flow/connectors/mysql" @@ -54,8 +53,8 @@ func heartbeatRoutine( ) } -func (a *FlowableActivity) getTableNameSchemaMapping(ctx context.Context, flowName string) (map[string]*protos.TableSchema, error) { - rows, err := a.CatalogPool.Query(ctx, "select table_name, table_schema from table_schema_mapping where flow_name = $1", flowName) +func getTableNameSchemaMapping(ctx context.Context, pool shared.CatalogPool, flowName string) (map[string]*protos.TableSchema, error) { + rows, err := pool.Query(ctx, "select table_name, table_schema from table_schema_mapping where flow_name = $1", flowName) if err != nil { return nil, err } @@ -65,7 +64,7 @@ func (a *FlowableActivity) getTableNameSchemaMapping(ctx context.Context, flowNa tableNameSchemaMapping := make(map[string]*protos.TableSchema) if _, err := pgx.ForEachRow(rows, []any{&tableName, &tableSchemaBytes}, func() error { tableSchema := &protos.TableSchema{} - if err := proto.Unmarshal(tableSchemaBytes, tableSchema); err != nil { + if err := internal.ProtoUnmarshal(tableSchemaBytes, tableSchema); err != nil { return err } tableNameSchemaMapping[tableName] = tableSchema @@ -93,7 +92,7 @@ func (a *FlowableActivity) applySchemaDeltas( } if len(schemaDeltas) > 0 { - if err := a.SetupTableSchema(ctx, &protos.SetupTableSchemaBatchInput{ + if err := setupTableSchema(ctx, a.CatalogPool, &protos.SetupTableSchemaBatchInput{ PeerName: config.SourceName, TableMappings: filteredTableMappings, FlowName: config.FlowJobName, @@ -101,7 +100,7 @@ func (a *FlowableActivity) applySchemaDeltas( Env: config.Env, Version: config.Version, }); err != nil { - return a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to execute schema update at source: %w", err)) + return fmt.Errorf("failed to execute schema update at source: %w", err) } } return nil @@ -152,7 +151,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon } }() if err != nil { - return nil, a.Alerter.LogFlowError(ctx, flowName, err) + return nil, fmt.Errorf("failed to get last offset: %w", err) } logger.Info("pulling records...", slog.Any("LastOffset", lastOffset)) @@ -168,13 +167,13 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon if adaptStream != nil { var err error if recordBatchSync, err = adaptStream(recordBatchPull); err != nil { - return nil, err + return nil, fmt.Errorf("failed to adapt stream: %w", err) } } - tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, flowName) + tableNameSchemaMapping, err := getTableNameSchemaMapping(ctx, a.CatalogPool, flowName) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get table name schema mapping: %w", err) } startTime := time.Now() @@ -207,13 +206,15 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon // wait for the pull goroutine to finish if err := errGroup.Wait(); err != nil { // don't log flow error for "replState changed" and "slot is already active" - if !(temporal.IsApplicationError(err) || shared.IsSQLStateError(err, pgerrcode.ObjectInUse)) { - _ = a.Alerter.LogFlowError(ctx, flowName, err) - } if temporal.IsApplicationError(err) { - return nil, err + return nil, exceptions.NewSkipLogFlowError(shared.LogError(logger, err)) } else { - return nil, fmt.Errorf("failed in pull records when: %w", err) + pullErr := fmt.Errorf("failed in pull records when: %w", err) + if shared.IsSQLStateError(err, pgerrcode.ObjectInUse) { + return nil, exceptions.NewSkipLogFlowError(pullErr) + } else { + return nil, pullErr + } } } logger.Info("no records to push") @@ -229,7 +230,10 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon return nil, fmt.Errorf("failed to sync schema: %w", err) } - return nil, a.applySchemaDeltas(ctx, config, options, recordBatchSync.SchemaDeltas) + if err := a.applySchemaDeltas(ctx, config, options, recordBatchSync.SchemaDeltas); err != nil { + return nil, fmt.Errorf("failed to apply schema deltas: %w", err) + } + return nil, nil } var res *model.SyncResponse @@ -242,7 +246,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon syncBatchID, err := dstConn.GetLastSyncBatchID(errCtx, flowName) if err != nil { - return err + return fmt.Errorf("failed to get last sync batch id: %w", err) } syncBatchID += 1 syncingBatchID.Store(syncBatchID) @@ -254,7 +258,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon BatchEndlSN: 0, StartTime: startTime, }); err != nil { - return a.Alerter.LogFlowError(ctx, flowName, err) + return fmt.Errorf("failed to add cdc batch for flow: %w", err) } res, err = sync(dstConn, errCtx, &model.SyncRecordsRequest[Items]{ @@ -270,7 +274,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon Version: config.Version, }) if err != nil { - return a.Alerter.LogFlowError(ctx, flowName, fmt.Errorf("failed to push records: %w", err)) + return fmt.Errorf("failed to push records: %w", err) } for _, warning := range res.Warnings { a.Alerter.LogFlowWarning(ctx, flowName, warning) @@ -282,15 +286,13 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon syncStartTime := time.Now() if err := errGroup.Wait(); err != nil { + cdcErr := fmt.Errorf("[cdc] failed to pull records: %w", err) // don't log flow error for "replState changed" and "slot is already active" var applicationError *temporal.ApplicationError if !((errors.As(err, &applicationError) && applicationError.Type() == "desync") || shared.IsSQLStateError(err, pgerrcode.ObjectInUse)) { - _ = a.Alerter.LogFlowError(ctx, flowName, err) - } - if temporal.IsApplicationError(err) { - return nil, err + return nil, exceptions.NewSkipLogFlowError(cdcErr) } else { - return nil, fmt.Errorf("[cdc] failed to pull records: %w", err) + return nil, cdcErr } } syncState.Store(shared.Ptr("bookkeeping")) @@ -299,23 +301,23 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon lastCheckpoint := recordBatchSync.GetLastCheckpoint() logger.Info("batch synced", slog.Any("checkpoint", lastCheckpoint)) if err := srcConn.UpdateReplStateLastOffset(ctx, lastCheckpoint); err != nil { - return nil, a.Alerter.LogFlowError(ctx, flowName, err) + return nil, fmt.Errorf("failed to update last offer: %w", err) } if err := monitoring.UpdateNumRowsAndEndLSNForCDCBatch( ctx, a.CatalogPool, flowName, res.CurrentSyncBatchID, uint32(res.NumRecordsSynced), lastCheckpoint, ); err != nil { - return nil, a.Alerter.LogFlowError(ctx, flowName, err) + return nil, fmt.Errorf("failed to update num_rows and end_lsn: %w", err) } if err := monitoring.UpdateLatestLSNAtTargetForCDCFlow(ctx, a.CatalogPool, flowName, lastCheckpoint.ID); err != nil { - return nil, a.Alerter.LogFlowError(ctx, flowName, err) + return nil, fmt.Errorf("failed to update latest lsn at target: %w", err) } if res.TableNameRowsMapping != nil { if err := monitoring.AddCDCBatchTablesForFlow( ctx, a.CatalogPool, flowName, res.CurrentSyncBatchID, res.TableNameRowsMapping, ); err != nil { - return nil, err + return nil, fmt.Errorf("failed to add cdc batch tables: %w", err) } } @@ -326,13 +328,19 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon syncState.Store(shared.Ptr("updating schema")) if err := a.applySchemaDeltas(ctx, config, options, res.TableSchemaDeltas); err != nil { - return nil, err + return nil, fmt.Errorf("failed to apply schema deltas: %w", err) + } + + if err := monitoring.UpdateSyncStatusSuccess( + ctx, a.CatalogPool, config.FlowJobName, res.CurrentSyncBatchID, + ); err != nil { + return nil, fmt.Errorf("failed to update sync status: %w", err) } if recordBatchSync.NeedsNormalize() { parallel, err := internal.PeerDBEnableParallelSyncNormalize(ctx, config.Env) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get parallel sync normalize config: %w", err) } var done chan struct{} if !parallel { @@ -379,7 +387,7 @@ func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*proto } var pgPeerConfig protos.PostgresConfig - unmarshalErr := proto.Unmarshal(peerOptions, &pgPeerConfig) + unmarshalErr := internal.ProtoUnmarshal(peerOptions, &pgPeerConfig) if unmarshalErr != nil { return nil, unmarshalErr } @@ -415,7 +423,7 @@ func replicateQRepPartition[TRead any, TWrite StreamCloser, TSync connectors.QRe dstConn, err := connectors.GetByNameAs[TSync](ctx, config.Env, a.CatalogPool, config.DestinationName) if err != nil { - return a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to get qrep destination connector: %w", err)) + return fmt.Errorf("failed to get qrep destination connector: %w", err) } defer connectors.CloseConnector(ctx, dstConn) @@ -424,7 +432,7 @@ func replicateQRepPartition[TRead any, TWrite StreamCloser, TSync connectors.QRe PartitionId: partition.PartitionId, }) if err != nil { - return a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to get fetch status of partition: %w", err)) + return fmt.Errorf("failed to get fetch status of partition: %w", err) } if done { logger.Info("no records to push for partition " + partition.PartitionId) @@ -433,7 +441,7 @@ func replicateQRepPartition[TRead any, TWrite StreamCloser, TSync connectors.QRe } if err := monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, time.Now()); err != nil { - return a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to update start time for partition: %w", err)) + return fmt.Errorf("failed to update start time for partition: %w", err) } logger.Info("replicating partition", slog.String("partitionId", partition.PartitionId)) @@ -444,13 +452,13 @@ func replicateQRepPartition[TRead any, TWrite StreamCloser, TSync connectors.QRe srcConn, err := connectors.GetByNameAs[TPull](ctx, config.Env, a.CatalogPool, config.SourceName) if err != nil { stream.Close(err) - return a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to get qrep source connector: %w", err)) + return fmt.Errorf("failed to get qrep source connector: %w", err) } defer connectors.CloseConnector(ctx, srcConn) numRecords, numBytes, err := pullRecords(srcConn, errCtx, a.OtelManager, config, partition, stream) if err != nil { - return a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("[qrep] failed to pull records: %w", err)) + return fmt.Errorf("[qrep] failed to pull records: %w", err) } // for Postgres source, reports all bytes fetched from source @@ -461,7 +469,7 @@ func replicateQRepPartition[TRead any, TWrite StreamCloser, TSync connectors.QRe if err := monitoring.UpdatePullEndTimeAndRowsForPartition( errCtx, a.CatalogPool, runUUID, partition, numRecords, ); err != nil { - logger.Error(err.Error()) + return fmt.Errorf("failed to update pull end time and rows for partition: %w", err) } return nil }) @@ -471,7 +479,7 @@ func replicateQRepPartition[TRead any, TWrite StreamCloser, TSync connectors.QRe var err error rowsSynced, warnings, err = syncRecords(dstConn, errCtx, config, partition, outstream) if err != nil { - return a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to sync records: %w", err)) + return fmt.Errorf("failed to sync records: %w", err) } for _, warning := range warnings { a.Alerter.LogFlowWarning(ctx, config.FlowJobName, warning) @@ -480,17 +488,20 @@ func replicateQRepPartition[TRead any, TWrite StreamCloser, TSync connectors.QRe }) if err := errGroup.Wait(); err != nil && err != context.Canceled { - return a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + return err } if rowsSynced > 0 { logger.Info(fmt.Sprintf("pushed %d records", rowsSynced)) if err := monitoring.UpdateRowsSyncedForPartition(ctx, a.CatalogPool, rowsSynced, runUUID, partition); err != nil { - return err + return fmt.Errorf("failed to update rows synced for partition: %w", err) } } - return monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition) + if err := monitoring.FinishPartition(ctx, logger, a.CatalogPool, config.FlowJobName, runUUID, partition); err != nil { + return fmt.Errorf("failed to finish partition: %w", err) + } + return nil } // replicateXminPartition replicates a XminPartition from the source to the destination. @@ -532,7 +543,7 @@ func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn numRecords, numBytes, currentSnapshotXmin, pullErr = pullRecords(srcConn, ctx, config, partition, stream) if pullErr != nil { logger.Warn(fmt.Sprintf("[xmin] failed to pull records: %v", pullErr)) - return a.Alerter.LogFlowError(ctx, config.FlowJobName, pullErr) + return pullErr } // The first sync of an XMIN mirror will have a partition without a range @@ -552,7 +563,7 @@ func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn if err := monitoring.InitializeQRepRun( ctx, logger, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics}, config.ParentMirrorName, ); err != nil { - return err + return fmt.Errorf("failed to get initialize qrep run: %w", err) } if err := monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, startTime); err != nil { @@ -567,8 +578,7 @@ func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn if err := monitoring.UpdatePullEndTimeAndRowsForPartition( errCtx, a.CatalogPool, runUUID, partition, numRecords, ); err != nil { - logger.Error(err.Error()) - return err + return fmt.Errorf("failed to update pull end time and rows for partition: %w", err) } return nil @@ -584,7 +594,7 @@ func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn var warnings shared.QRepWarnings rowsSynced, warnings, err = syncRecords(dstConn, ctx, config, partition, outstream) if err != nil { - return a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to sync records: %w", err)) + return fmt.Errorf("failed to sync records: %w", err) } for _, warning := range warnings { a.Alerter.LogFlowWarning(ctx, config.FlowJobName, warning) @@ -593,19 +603,19 @@ func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn }) if err := errGroup.Wait(); err != nil && err != context.Canceled { - return 0, a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + return 0, err } if rowsSynced > 0 { err := monitoring.UpdateRowsSyncedForPartition(ctx, a.CatalogPool, rowsSynced, runUUID, partition) if err != nil { - return 0, err + return 0, fmt.Errorf("failed to update rows synced for partition: %w", err) } logger.Info(fmt.Sprintf("pushed %d records", rowsSynced)) } - if err := monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition); err != nil { + if err := monitoring.FinishPartition(ctx, logger, a.CatalogPool, config.FlowJobName, runUUID, partition); err != nil { return 0, err } @@ -613,7 +623,7 @@ func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn } func (a *FlowableActivity) maintainReplConn( - ctx context.Context, flowName string, srcConn connectors.CDCPullConnectorCore, syncDone <-chan struct{}, + ctx context.Context, srcConn connectors.CDCPullConnectorCore, syncDone <-chan struct{}, ) error { ticker := time.NewTicker(15 * time.Second) defer ticker.Stop() @@ -622,7 +632,7 @@ func (a *FlowableActivity) maintainReplConn( select { case <-ticker.C: if err := srcConn.ReplPing(ctx); err != nil { - return a.Alerter.LogFlowError(ctx, flowName, fmt.Errorf("connection to source down: %w", err)) + return fmt.Errorf("connection to source down: %w", err) } case <-syncDone: return nil @@ -646,13 +656,16 @@ func (a *FlowableActivity) startNormalize( config.DestinationName, ) if errors.Is(err, errors.ErrUnsupported) { - return monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, config.FlowJobName, batchID) + if err := monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, config.FlowJobName, batchID); err != nil { + return fmt.Errorf("failed to update end time for cdc batch: %w", err) + } + return nil } else if err != nil { - return a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to get normalize connector: %w", err)) + return fmt.Errorf("failed to get normalize connector: %w", err) } defer connectors.CloseConnector(ctx, dstConn) - tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, config.FlowJobName) + tableNameSchemaMapping, err := getTableNameSchemaMapping(ctx, a.CatalogPool, config.FlowJobName) if err != nil { return fmt.Errorf("failed to get table name schema mapping: %w", err) } @@ -669,14 +682,16 @@ func (a *FlowableActivity) startNormalize( Version: config.Version, }) if err != nil { - return a.Alerter.LogFlowError(ctx, config.FlowJobName, - exceptions.NewNormalizationError(fmt.Errorf("failed to normalize records: %w", err))) + return exceptions.NewNormalizationError("failed to normalize records: %w", err) } if _, dstPg := dstConn.(*connpostgres.PostgresConnector); dstPg { if err := monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, config.FlowJobName, batchID); err != nil { return fmt.Errorf("failed to update end time for cdc batch: %w", err) } } + if err := monitoring.UpdateNormalizeStatusSuccess(ctx, a.CatalogPool, config.FlowJobName, batchID); err != nil { + return fmt.Errorf("failed to update normalize status: %w", err) + } logger.Info("normalized batches", slog.Int64("StartBatchID", res.StartBatchID), slog.Int64("EndBatchID", res.EndBatchID)) @@ -705,7 +720,7 @@ func (a *FlowableActivity) normalizeLoop( for { normalizingBatchID.Store(req.BatchID) if err := a.startNormalize(ctx, config, req.BatchID); err != nil { - _ = a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + a.Alerter.LogFlowNormalizeError(ctx, config.FlowJobName, req.BatchID, err) for { // update req to latest normalize request & retry select { diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index afa1ee3363..d6f8f919a0 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -11,6 +11,8 @@ import ( "github.com/PeerDB-io/peerdb/flow/alerting" "github.com/PeerDB-io/peerdb/flow/connectors" + "github.com/PeerDB-io/peerdb/flow/connectors/utils" + "github.com/PeerDB-io/peerdb/flow/connectors/utils/monitoring" "github.com/PeerDB-io/peerdb/flow/generated/protos" "github.com/PeerDB-io/peerdb/flow/internal" "github.com/PeerDB-io/peerdb/flow/shared" @@ -48,7 +50,30 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName s delete(a.SlotSnapshotStates, flowJobName) } a.Alerter.LogFlowEvent(ctx, flowJobName, "Ended Snapshot Flow Job") + return nil +} + +func (a *SnapshotActivity) InitializeSnapshot( + ctx context.Context, flowName string, +) (int32, error) { + ctx = context.WithValue(ctx, shared.FlowNameKey, flowName) + logger := internal.LoggerFromCtx(ctx) + snapshotID, err := monitoring.InitializeSnapshot(ctx, logger, a.CatalogPool, flowName) + if err != nil { + return -1, a.Alerter.LogFlowErrorNoStatus(ctx, flowName, err) + } + return snapshotID, nil +} + +func (a *SnapshotActivity) FinishSnapshot( + ctx context.Context, flowName string, snapshotID int32, +) error { + ctx = context.WithValue(ctx, shared.FlowNameKey, flowName) + logger := internal.LoggerFromCtx(ctx) + if err := monitoring.FinishSnapshot(ctx, logger, a.CatalogPool, flowName, snapshotID); err != nil { + return a.Alerter.LogFlowSnapshotError(ctx, flowName, snapshotID, err) + } return nil } @@ -63,7 +88,8 @@ func (a *SnapshotActivity) SetupReplication( conn, err := connectors.GetByNameAs[connectors.CDCPullConnectorCore](ctx, nil, a.CatalogPool, config.PeerName) if err != nil { - return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to get connector: %w", err)) + conErr := fmt.Errorf("failed to get connector: %w", err) + return nil, a.Alerter.LogFlowSnapshotError(ctx, config.FlowJobName, config.SnapshotId, conErr) } logger.Info("waiting for slot to be created...") @@ -72,7 +98,8 @@ func (a *SnapshotActivity) SetupReplication( if err != nil { connectors.CloseConnector(ctx, conn) // it is important to close the connection here as it is not closed in CloseSlotKeepAlive - return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("slot error: %w", err)) + slotErr := fmt.Errorf("slot error: %w", err) + return nil, a.Alerter.LogFlowSnapshotError(ctx, config.FlowJobName, config.SnapshotId, slotErr) } else if slotInfo.Conn == nil && slotInfo.SlotName == "" { connectors.CloseConnector(ctx, conn) logger.Info("replication setup without slot") @@ -99,20 +126,22 @@ func (a *SnapshotActivity) SetupReplication( }, nil } -func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, peer string, env map[string]string) error { +func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, peer string, env map[string]string, snapshotID int32) error { shutdown := heartbeatRoutine(ctx, func() string { return "maintaining transaction snapshot" }) defer shutdown() conn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, nil, a.CatalogPool, peer) if err != nil { - return a.Alerter.LogFlowError(ctx, sessionID, err) + getErr := fmt.Errorf("failed to get connector: %w", err) + return a.Alerter.LogFlowSnapshotError(ctx, sessionID, snapshotID, getErr) } defer connectors.CloseConnector(ctx, conn) exportSnapshotOutput, tx, err := conn.ExportTxSnapshot(ctx, env) if err != nil { - return err + exportErr := fmt.Errorf("failed to export tx snapshot: %w", err) + return a.Alerter.LogFlowSnapshotError(ctx, sessionID, snapshotID, exportErr) } a.SnapshotStatesMutex.Lock() @@ -134,7 +163,11 @@ func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, pee a.SnapshotStatesMutex.Lock() delete(a.TxSnapshotStates, sessionID) a.SnapshotStatesMutex.Unlock() - return conn.FinishExport(tx) + if err := conn.FinishExport(tx); err != nil { + finishErr := fmt.Errorf("failed to finish export: %w", err) + return a.Alerter.LogFlowSnapshotError(ctx, sessionID, snapshotID, finishErr) + } + return nil } time.Sleep(time.Minute) } @@ -166,6 +199,40 @@ func (a *SnapshotActivity) LoadTableSchema( ctx context.Context, flowName string, tableName string, + snapshotID int32, ) (*protos.TableSchema, error) { - return internal.LoadTableSchemaFromCatalog(ctx, a.CatalogPool, flowName, tableName) + schema, err := internal.LoadTableSchemaFromCatalog(ctx, a.CatalogPool, flowName, tableName) + if err != nil { + loadErr := fmt.Errorf("failed to load schema from catalog: %w", err) + return nil, a.Alerter.LogFlowSnapshotError(ctx, flowName, snapshotID, loadErr) + } + return schema, nil +} + +func (a *SnapshotActivity) ParseSchemaTable( + ctx context.Context, + flowName string, + tableName string, + snapshotID int32, +) (*utils.SchemaTable, error) { + parsedTable, err := utils.ParseSchemaTable(tableName) + if err != nil { + parseErr := fmt.Errorf("failed to parse schema table: %w", err) + return nil, a.Alerter.LogFlowSnapshotError(ctx, flowName, snapshotID, parseErr) + } + return parsedTable, nil +} + +func (a *SnapshotActivity) GetPeerType( + ctx context.Context, + flowName string, + peerName string, + snapshotID int32, +) (protos.DBType, error) { + dbtype, err := connectors.LoadPeerType(ctx, a.CatalogPool, peerName) + if err != nil { + peerErr := fmt.Errorf("failed to get peer type: %w", err) + return 0, a.Alerter.LogFlowSnapshotError(ctx, flowName, snapshotID, peerErr) + } + return dbtype, nil } diff --git a/flow/alerting/alerting.go b/flow/alerting/alerting.go index d2e39acb88..694792a33d 100644 --- a/flow/alerting/alerting.go +++ b/flow/alerting/alerting.go @@ -444,25 +444,41 @@ const ( flowErrorTypeError flowErrorType = "error" ) -// logFlowErrorInternal pushes the error to the errors table and emits a metric as well as a telemetry message -func (a *Alerter) logFlowErrorInternal( +// logFlowErrorImpl pushes the error to the errors table and emits a metric as well as a telemetry message +func (a *Alerter) logFlowErrorImpl( ctx context.Context, flowName string, errorType flowErrorType, inErr error, + logger log.Logger, loggerFunc func(string, ...any), ) { - logger := internal.LoggerFromCtx(ctx) inErrWithStack := fmt.Sprintf("%+v", inErr) loggerFunc(inErr.Error(), slog.String("stack", inErrWithStack)) - if _, err := a.CatalogPool.Exec( - ctx, "INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)", - flowName, inErrWithStack, errorType.String(), - ); err != nil { - logger.Error("failed to insert flow error", slog.Any("error", err)) - return + retryInterval := time.Second + for { + if _, err := a.CatalogPool.Exec( + ctx, "INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)", + flowName, inErrWithStack, errorType, + ); err != nil { + insertErr := shared.LogError(logger, fmt.Errorf("failed to insert flow error: %w", err)) + errInfo := ErrorInfo{Source: ErrorSourcePostgresCatalog, Code: "UNKNOWN"} + a.emitClassifiedError(ctx, logger, flowName, errorType, ErrorInternal, errInfo, insertErr, insertErr.Error(), loggerFunc) + time.Sleep(retryInterval) + retryInterval = min(retryInterval*2, time.Minute) + continue + } + break } + errorClass, errInfo := GetErrorClass(ctx, inErr) + a.emitClassifiedError(ctx, logger, flowName, errorType, errorClass, errInfo, inErr, inErrWithStack, loggerFunc) +} + +func (a *Alerter) emitClassifiedError( + ctx context.Context, logger log.Logger, flowName string, errorType flowErrorType, errorClass ErrorClass, errInfo ErrorInfo, + inErr error, inErrWithStack string, loggerFunc func(string, ...any), +) { var tags []string if errors.Is(inErr, context.Canceled) { tags = append(tags, string(shared.ErrTypeCanceled)) @@ -494,14 +510,11 @@ func (a *Alerter) logFlowErrorInternal( if errors.As(inErr, &sshErr) { tags = append(tags, string(shared.ErrTypeNet)) } - - errorClass, errInfo := GetErrorClass(ctx, inErr) tags = append(tags, "errorClass:"+errorClass.String(), "errorAction:"+errorClass.ErrorAction().String()) - if !internal.PeerDBTelemetryErrorActionBasedAlertingEnabled() || errorClass.ErrorAction() == NotifyTelemetry { - // Warnings alert us just like errors until there's a customer warning system a.sendTelemetryMessage(ctx, logger, flowName, inErrWithStack, telemetry.ERROR, tags...) } + loggerFunc(fmt.Sprintf("Emitting classified error '%s'", inErr.Error()), slog.Any("error", inErr), slog.Any("errorClass", errorClass), @@ -530,15 +543,54 @@ func (a *Alerter) logFlowErrorInternal( gauge.Record(ctx, 1, errorAttributeSet) } -func (a *Alerter) LogFlowError(ctx context.Context, flowName string, inErr error) error { +func (a *Alerter) LogFlowErrorNoStatus(ctx context.Context, flowName string, inErr error) error { + logger := internal.LoggerFromCtx(ctx) + // TODO check that this one just logs without updating status + a.logFlowErrorImpl(ctx, flowName, flowErrorTypeError, inErr, logger, logger.Error) + return inErr +} + +func (a *Alerter) LogFlowSyncError(ctx context.Context, flowName string, batchID int64, inErr error) error { + logger := internal.LoggerFromCtx(ctx) + // TODO use batchID + a.logFlowErrorImpl(ctx, flowName, flowErrorTypeError, inErr, logger, logger.Error) + return inErr +} + +func (a *Alerter) LogFlowNormalizeError(ctx context.Context, flowName string, batchID int64, inErr error) { + logger := internal.LoggerFromCtx(ctx) + // TODO use batchID + a.logFlowErrorImpl(ctx, flowName, flowErrorTypeError, inErr, logger, logger.Error) +} + +func (a *Alerter) LogFlowSnapshotError(ctx context.Context, flowName string, snapshotID int32, inErr error) error { + logger := internal.LoggerFromCtx(ctx) + // TODO use snapshotID + a.logFlowErrorImpl(ctx, flowName, flowErrorTypeError, inErr, logger, logger.Error) + return inErr +} + +func (a *Alerter) LogFlowSnapshotQRepError( + ctx context.Context, flowName string, snapshotID int32, qRepRunID string, inErr error, +) error { + logger := internal.LoggerFromCtx(ctx) + // TODO use snapshotID, qRepRunID + a.logFlowErrorImpl(ctx, flowName, flowErrorTypeError, inErr, logger, logger.Error) + return inErr +} + +func (a *Alerter) LogFlowSnapshotPartitionError( + ctx context.Context, flowName string, snapshotID int32, partitionID string, inErr error, +) error { logger := internal.LoggerFromCtx(ctx) - a.logFlowErrorInternal(ctx, flowName, flowErrorTypeError, inErr, logger.Error) + // TODO use snapshotID, partitionID + a.logFlowErrorImpl(ctx, flowName, flowErrorTypeError, inErr, logger, logger.Error) return inErr } func (a *Alerter) LogFlowWarning(ctx context.Context, flowName string, inErr error) { logger := internal.LoggerFromCtx(ctx) - a.logFlowErrorInternal(ctx, flowName, flowErrorTypeWarn, inErr, logger.Warn) + a.logFlowErrorImpl(ctx, flowName, flowErrorTypeWarn, inErr, logger, logger.Warn) } func (a *Alerter) LogFlowEvent(ctx context.Context, flowName string, info string) { diff --git a/flow/alerting/classifier.go b/flow/alerting/classifier.go index 95a340a562..7c32281702 100644 --- a/flow/alerting/classifier.go +++ b/flow/alerting/classifier.go @@ -215,6 +215,30 @@ func GetErrorClass(ctx context.Context, err error) (ErrorClass, ErrorInfo) { } } + var dynamicConfError *exceptions.DynamicConfError + if errors.As(err, &dynamicConfError) { + return ErrorInternal, ErrorInfo{ + Source: ErrorSourceOther, + Code: "UNKNOWN", + } + } + + var protoMarshalError *exceptions.ProtoMarshalError + if errors.As(err, &protoMarshalError) { + return ErrorInternal, ErrorInfo{ + Source: ErrorSourceOther, + Code: "UNKNOWN", + } + } + + var protoUnmarshalError *exceptions.ProtoUnmarshalError + if errors.As(err, &protoUnmarshalError) { + return ErrorInternal, ErrorInfo{ + Source: ErrorSourceOther, + Code: "UNKNOWN", + } + } + if errors.Is(err, context.Canceled) { // Generally happens during workflow cancellation return ErrorIgnoreContextCancelled, ErrorInfo{ diff --git a/flow/alerting/classifier_test.go b/flow/alerting/classifier_test.go index 429fd0a5f5..03e8c61ede 100644 --- a/flow/alerting/classifier_test.go +++ b/flow/alerting/classifier_test.go @@ -163,7 +163,7 @@ func TestClickHousePushingToViewShouldBeMvError(t *testing.T) { is not supported: while converting source column created_at to destination column created_at: while pushing to view db_name.hello_mv`, } - errorClass, errInfo := GetErrorClass(t.Context(), exceptions.NewNormalizationError(fmt.Errorf("error in WAL: %w", err))) + errorClass, errInfo := GetErrorClass(t.Context(), exceptions.NewNormalizationError("error in WAL: %w", err)) assert.Equal(t, ErrorNotifyMVOrView, errorClass, "Unexpected error class") assert.Equal(t, ErrorInfo{ Source: ErrorSourceClickHouse, @@ -199,8 +199,8 @@ func TestClickHouseChaoticNormalizeErrorShouldBeNotifyMVNow(t *testing.T) { Left key __table1.column_2 type String. Right key __table2.column_1 type Int64`, } errorClass, errInfo := GetErrorClass(t.Context(), - exceptions.NewNormalizationError(fmt.Errorf(`Normalization Error: failed to normalize records: - error while inserting into normalized table table_A: %w`, err))) + exceptions.NewNormalizationError(`Normalization Error: failed to normalize records: + error while inserting into normalized table table_A: %w`, err)) assert.Equal(t, ErrorNotifyMVOrView, errorClass, "Unexpected error class") assert.Equal(t, ErrorInfo{ Source: ErrorSourceClickHouse, @@ -405,7 +405,7 @@ func TestClickHouseUnknownTableShouldBeDestinationModified(t *testing.T) { Message: "Table abc does not exist.", } errorClass, errInfo := GetErrorClass(t.Context(), - exceptions.NewNormalizationError(fmt.Errorf("failed to normalize records: %w", err))) + exceptions.NewNormalizationError("failed to normalize records: %w", err)) assert.Equal(t, ErrorNotifyDestinationModified, errorClass, "Unexpected error class") assert.Equal(t, ErrorInfo{ Source: ErrorSourceClickHouse, @@ -421,7 +421,7 @@ func TestClickHouseUnkownTableWhilePushingToViewShouldBeNotifyMVNow(t *testing.T Message: "Table abc does not exist. Maybe you meant abc2?: while executing 'FUNCTION func()': while pushing to view some_mv (some-uuid-here)", } errorClass, errInfo := GetErrorClass(t.Context(), - exceptions.NewNormalizationError(fmt.Errorf("failed to normalize records: %w", err))) + exceptions.NewNormalizationError("failed to normalize records: %w", err)) assert.Equal(t, ErrorNotifyMVOrView, errorClass, "Unexpected error class") assert.Equal(t, ErrorInfo{ Source: ErrorSourceClickHouse, @@ -436,7 +436,7 @@ func TestNonClassifiedNormalizeErrorShouldBeNotifyMVNow(t *testing.T) { Message: "JOIN ANY LEFT JOIN ... ON a.id = b.b_id ambiguous identifier 'c_id'. In scope SELECT ...", } errorClass, errInfo := GetErrorClass(t.Context(), - exceptions.NewNormalizationError(fmt.Errorf("failed to normalize records: %w", err))) + exceptions.NewNormalizationError("failed to normalize records: %w", err)) assert.Equal(t, ErrorNotifyMVOrView, errorClass, "Unexpected error class") assert.Equal(t, ErrorInfo{ Source: ErrorSourceClickHouse, diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 361a1bd12b..91ffc8b705 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -8,11 +8,12 @@ import ( "time" "github.com/google/uuid" + "github.com/jackc/pgerrcode" + "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgtype" tEnums "go.temporal.io/api/enums/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" - "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/durationpb" "github.com/PeerDB-io/peerdb/flow/alerting" @@ -146,6 +147,11 @@ func (h *FlowRequestHandler) CreateCDCFlow( return nil, fmt.Errorf("unable to update flow config in catalog: %w", err) } + if err := h.createGranularStatusEntry(ctx, cfg); err != nil { + slog.Error("unable to create granular status entry", slog.Any("error", err)) + return nil, fmt.Errorf("unable to create granular status entry: %w", err) + } + if _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.CDCFlowWorkflow, cfg, nil); err != nil { slog.Error("unable to start PeerFlow workflow", slog.Any("error", err)) return nil, fmt.Errorf("unable to start PeerFlow workflow: %w", err) @@ -163,6 +169,44 @@ func (h *FlowRequestHandler) updateFlowConfigInCatalog( return internal.UpdateCDCConfigInCatalog(ctx, h.pool, slog.Default(), cfg) } +func (h *FlowRequestHandler) createGranularStatusEntry( + ctx context.Context, + cfg *protos.FlowConnectionConfigs, +) error { + var pgErr *pgconn.PgError + if _, err := h.pool.Exec(ctx, ` + INSERT INTO peerdb_stats.granular_status (flow_name, snapshot_succeeding, + sync_succeeding, normalize_succeeding, slot_lag_low) VALUES ($1, true, true, true, true) + `, cfg.FlowJobName, + ); errors.As(err, &pgErr) && pgErr.Code == pgerrcode.UniqueViolation { + tx, err := h.pool.Begin(ctx) + if err != nil { + return fmt.Errorf("unable to begin transaction for resetting granular status: %w", err) + } + defer shared.RollbackTx(tx, internal.LoggerFromCtx(ctx)) + + if _, err := tx.Exec(ctx, ` + DELETE FROM peerdb_stats.granular_status where flow_name = $1 + `, cfg.FlowJobName, + ); err != nil { + return fmt.Errorf("unable to reset granular status: %w", err) + } + if _, err := tx.Exec(ctx, ` + INSERT INTO peerdb_stats.granular_status (flow_name, snapshot_succeeding, + sync_succeeding, normalize_succeeding, slot_lag_low) VALUES ($1, true, true, true, true) + `, cfg.FlowJobName, + ); err != nil { + return fmt.Errorf("unable to replace granular status: %w", err) + } + if err := tx.Commit(ctx); err != nil { + return fmt.Errorf("unable to commit transaction for resetting granular status: %w", err) + } + } else if err != nil { + return fmt.Errorf("unable to insert into granular status table: %w", err) + } + return nil +} + func (h *FlowRequestHandler) CreateQRepFlow( ctx context.Context, req *protos.CreateQRepFlowRequest, ) (*protos.CreateQRepFlowResponse, error) { @@ -220,7 +264,7 @@ func (h *FlowRequestHandler) updateQRepConfigInCatalog( ctx context.Context, cfg *protos.QRepConfig, ) error { - cfgBytes, err := proto.Marshal(cfg) + cfgBytes, err := internal.ProtoMarshal(cfg) if err != nil { return fmt.Errorf("unable to marshal qrep config: %w", err) } diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index fed688f960..6d08c7645c 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -11,7 +11,6 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" - "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" "github.com/PeerDB-io/peerdb/flow/connectors" @@ -425,7 +424,7 @@ func (h *FlowRequestHandler) getFlowConfigFromCatalog( } var config protos.FlowConnectionConfigs - if err := proto.Unmarshal(configBytes, &config); err != nil { + if err := internal.ProtoUnmarshal(configBytes, &config); err != nil { slog.Error("unable to unmarshal flow config", slog.Any("error", err)) return nil, fmt.Errorf("unable to unmarshal flow config: %w", err) } diff --git a/flow/connectors/core.go b/flow/connectors/core.go index e169b1417f..f048e5693c 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -6,8 +6,6 @@ import ( "fmt" "log/slog" - "google.golang.org/protobuf/proto" - connbigquery "github.com/PeerDB-io/peerdb/flow/connectors/bigquery" connclickhouse "github.com/PeerDB-io/peerdb/flow/connectors/clickhouse" connelasticsearch "github.com/PeerDB-io/peerdb/flow/connectors/elasticsearch" @@ -352,73 +350,73 @@ func LoadPeer(ctx context.Context, catalogPool shared.CatalogPool, peerName stri switch peer.Type { case protos.DBType_BIGQUERY: var config protos.BigqueryConfig - if err := proto.Unmarshal(peerOptions, &config); err != nil { + if err := internal.ProtoUnmarshal(peerOptions, &config); err != nil { return nil, fmt.Errorf("failed to unmarshal BigQuery config: %w", err) } peer.Config = &protos.Peer_BigqueryConfig{BigqueryConfig: &config} case protos.DBType_SNOWFLAKE: var config protos.SnowflakeConfig - if err := proto.Unmarshal(peerOptions, &config); err != nil { + if err := internal.ProtoUnmarshal(peerOptions, &config); err != nil { return nil, fmt.Errorf("failed to unmarshal Snowflake config: %w", err) } peer.Config = &protos.Peer_SnowflakeConfig{SnowflakeConfig: &config} case protos.DBType_POSTGRES: var config protos.PostgresConfig - if err := proto.Unmarshal(peerOptions, &config); err != nil { + if err := internal.ProtoUnmarshal(peerOptions, &config); err != nil { return nil, fmt.Errorf("failed to unmarshal Postgres config: %w", err) } peer.Config = &protos.Peer_PostgresConfig{PostgresConfig: &config} case protos.DBType_S3: var config protos.S3Config - if err := proto.Unmarshal(peerOptions, &config); err != nil { + if err := internal.ProtoUnmarshal(peerOptions, &config); err != nil { return nil, fmt.Errorf("failed to unmarshal S3 config: %w", err) } peer.Config = &protos.Peer_S3Config{S3Config: &config} case protos.DBType_SQLSERVER: var config protos.SqlServerConfig - if err := proto.Unmarshal(peerOptions, &config); err != nil { + if err := internal.ProtoUnmarshal(peerOptions, &config); err != nil { return nil, fmt.Errorf("failed to unmarshal SQL Server config: %w", err) } peer.Config = &protos.Peer_SqlserverConfig{SqlserverConfig: &config} case protos.DBType_MONGO: var config protos.MongoConfig - if err := proto.Unmarshal(peerOptions, &config); err != nil { + if err := internal.ProtoUnmarshal(peerOptions, &config); err != nil { return nil, fmt.Errorf("failed to unmarshal MongoDB config: %w", err) } peer.Config = &protos.Peer_MongoConfig{MongoConfig: &config} case protos.DBType_MYSQL: var config protos.MySqlConfig - if err := proto.Unmarshal(peerOptions, &config); err != nil { + if err := internal.ProtoUnmarshal(peerOptions, &config); err != nil { return nil, fmt.Errorf("failed to unmarshal MySQL config: %w", err) } peer.Config = &protos.Peer_MysqlConfig{MysqlConfig: &config} case protos.DBType_CLICKHOUSE: var config protos.ClickhouseConfig - if err := proto.Unmarshal(peerOptions, &config); err != nil { + if err := internal.ProtoUnmarshal(peerOptions, &config); err != nil { return nil, fmt.Errorf("failed to unmarshal ClickHouse config: %w", err) } peer.Config = &protos.Peer_ClickhouseConfig{ClickhouseConfig: &config} case protos.DBType_KAFKA: var config protos.KafkaConfig - if err := proto.Unmarshal(peerOptions, &config); err != nil { + if err := internal.ProtoUnmarshal(peerOptions, &config); err != nil { return nil, fmt.Errorf("failed to unmarshal Kafka config: %w", err) } peer.Config = &protos.Peer_KafkaConfig{KafkaConfig: &config} case protos.DBType_PUBSUB: var config protos.PubSubConfig - if err := proto.Unmarshal(peerOptions, &config); err != nil { + if err := internal.ProtoUnmarshal(peerOptions, &config); err != nil { return nil, fmt.Errorf("failed to unmarshal Pub/Sub config: %w", err) } peer.Config = &protos.Peer_PubsubConfig{PubsubConfig: &config} case protos.DBType_EVENTHUBS: var config protos.EventHubGroupConfig - if err := proto.Unmarshal(peerOptions, &config); err != nil { + if err := internal.ProtoUnmarshal(peerOptions, &config); err != nil { return nil, fmt.Errorf("failed to unmarshal Event Hubs config: %w", err) } peer.Config = &protos.Peer_EventhubGroupConfig{EventhubGroupConfig: &config} case protos.DBType_ELASTICSEARCH: var config protos.ElasticsearchConfig - if err := proto.Unmarshal(peerOptions, &config); err != nil { + if err := internal.ProtoUnmarshal(peerOptions, &config); err != nil { return nil, fmt.Errorf("failed to unmarshal Elasticsearch config: %w", err) } peer.Config = &protos.Peer_ElasticsearchConfig{ElasticsearchConfig: &config} diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 2c4fe2398d..64d947d66e 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -1389,6 +1389,15 @@ func (c *PostgresConnector) HandleSlotInfo( slog.Float64("LagInMB", float64(slotInfo[0].LagInMb))) alerter.AlertIfSlotLag(ctx, alertKeys, slotInfo[0]) + if _, err := catalogPool.Exec(ctx, `update peerdb_stats.granular_status + set slot_lag_low = $1, slot_lag_mib = $2, slot_lag_updated_at = utc_now() + where flow_name = $3 + `, slotInfo[0].LagInMb > float32(shared.SlotLagThresholdMiB), slotInfo[0].LagInMb, + alertKeys.FlowName, + ); err != nil { + logger.Warn("failed to update granular status slot lag", slog.Any("error", err)) + } + attributeSet := metric.WithAttributeSet(attribute.NewSet( attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName), attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName), diff --git a/flow/connectors/utils/monitoring/monitoring.go b/flow/connectors/utils/monitoring/monitoring.go index ff2e57b703..d00fba7d91 100644 --- a/flow/connectors/utils/monitoring/monitoring.go +++ b/flow/connectors/utils/monitoring/monitoring.go @@ -189,6 +189,166 @@ func AddCDCBatchTablesForFlow(ctx context.Context, pool shared.CatalogPool, flow return nil } +func UpdateSyncStatusSuccess( + ctx context.Context, pool shared.CatalogPool, flowJobName string, batchID int64, +) error { + if _, err := pool.Exec(ctx, ` + UPDATE peerdb_stats.granular_status + SET sync_succeeding = true, sync_is_internal_error = false, + sync_last_successful_batch_id = $1, sync_updated_at = utc_now() + WHERE flow_name = $2 + `, batchID, flowJobName, + ); err != nil { + return fmt.Errorf("failed to update granular status for sync batch: %w", err) + } + return nil +} + +func UpdateNormalizeStatusSuccess( + ctx context.Context, pool shared.CatalogPool, flowJobName string, batchID int64, +) error { + if _, err := pool.Exec(ctx, ` + UPDATE peerdb_stats.granular_status + SET normalize_succeeding = true, normalize_is_internal_error = false, + normalize_last_successful_batch_id = $1, normalize_updated_at = utc_now() + WHERE flow_name = $2 + `, batchID, flowJobName, + ); err != nil { + return fmt.Errorf("failed to update granular status for normalize batch: %w", err) + } + return nil +} + +func UpdateSyncStatusError( + ctx context.Context, pool shared.CatalogPool, flowJobName string, isInternalError bool, +) error { + syncIsInternalError := "sync_is_internal_error" // don't modify the value + if isInternalError { + syncIsInternalError = "true" + } + if _, err := pool.Exec(ctx, ` + UPDATE peerdb_stats.granular_status + SET sync_succeeding = false, sync_is_internal_error = `+syncIsInternalError+`, + sync_updated_at = utc_now() + WHERE flow_name = $1 + `, flowJobName, + ); err != nil { + return fmt.Errorf("failed to update granular status for sync batch: %w", err) + } + return nil +} + +func UpdateNormalizeStatusError( + ctx context.Context, pool shared.CatalogPool, flowJobName string, isInternalError bool, +) error { + normalizeIsInternalError := "normalize_is_internal_error" // don't modify the value + if isInternalError { + normalizeIsInternalError = "true" + } + if _, err := pool.Exec(ctx, ` + UPDATE peerdb_stats.granular_status + SET normalize_succeeding = false, normalize_is_internal_error = `+normalizeIsInternalError+`, + normalize_updated_at = utc_now() + WHERE flow_name = $1 + `, flowJobName, + ); err != nil { + return fmt.Errorf("failed to update granular status for normalize batch: %w", err) + } + return nil +} + +func InitializeSnapshot( + ctx context.Context, + logger log.Logger, + pool shared.CatalogPool, + flowName string, +) (int32, error) { + tx, err := pool.Begin(ctx) + if err != nil { + return -1, fmt.Errorf("error while starting transaction to initialize snapshot: %w", err) + } + defer shared.RollbackTx(tx, logger) + + row := tx.QueryRow(ctx, + "INSERT INTO peerdb_stats.snapshots(flow_name,start_time)"+ + " VALUES($1,utc_now()) ON CONFLICT DO NOTHING RETURNING snapshot_id", + flowName) + var snapshotId int32 + if err := row.Scan(&snapshotId); err != nil { + return -1, fmt.Errorf("error while inserting snapshot in peerdb_stats: %w", err) + } + + if _, err := tx.Exec(ctx, + "UPDATE peerdb_stats.granular_status"+ + " SET snapshot_current_id = $1, snapshot_succeeding = true, snapshot_failing_qrep_run_ids = '{}',"+ + " snapshot_failing_partition_ids = '{}', snapshot_is_internal_error = false,"+ + " snapshot_updated_at = utc_now()"+ + " WHERE flow_name = $2", + snapshotId, flowName, + ); err != nil { + return -1, fmt.Errorf("error while initializing granular status for snapshot: %w", err) + } + + if err := tx.Commit(ctx); err != nil { + return -1, fmt.Errorf("error while committing transaction to initialize snapshot: %w", err) + } + + return snapshotId, nil +} + +func FinishSnapshot( + ctx context.Context, + logger log.Logger, + pool shared.CatalogPool, + flowName string, + snapshotID int32, +) error { + tx, err := pool.Begin(ctx) + if err != nil { + return fmt.Errorf("error while starting transaction to finish snapshot: %w", err) + } + defer shared.RollbackTx(tx, logger) + + if _, err := tx.Exec(ctx, + "UPDATE peerdb_stats.snapshots SET end_time = utc_now() where snapshot_id = $1", + snapshotID, + ); err != nil { + return fmt.Errorf("error while updating end time for snapshot: %w", err) + } + + if _, err := pool.Exec(ctx, ` + UPDATE peerdb_stats.granular_status + SET snapshot_succeeding = true, snapshot_failing_qrep_run_ids = '{}', + snapshot_failing_partition_ids = '{}', snapshot_is_internal_error = false, + snapshot_updated_at = utc_now() + WHERE flow_name = $1 and snapshot_current_id = $2 + `, flowName, snapshotID, + ); err != nil { + return fmt.Errorf("error while updating granular status for snapshot: %w", err) + } + + if err := tx.Commit(ctx); err != nil { + return fmt.Errorf("error while committing transaction to finish snapshot: %w", err) + } + + return nil +} + +func UpdateQRepStatusSuccess( + ctx context.Context, pool shared.CatalogPool, flowJobName string, snapshotID int32, runUUID string, +) error { + if _, err := pool.Exec(ctx, ` + UPDATE peerdb_stats.granular_status + SET snapshot_failing_qrep_run_ids = array_remove(snapshot_failing_qrep_run_ids, $1), + snapshot_updated_at = utc_now() + WHERE flow_name = $2 and snapshot_current_id = $3 + `, runUUID, flowJobName, snapshotID, + ); err != nil { + return fmt.Errorf("failed to update granular status for qrep run: %w", err) + } + return nil +} + func InitializeQRepRun( ctx context.Context, logger log.Logger, @@ -374,15 +534,35 @@ func UpdatePullEndTimeAndRowsForPartition(ctx context.Context, pool shared.Catal return nil } -func UpdateEndTimeForPartition(ctx context.Context, pool shared.CatalogPool, runUUID string, +func FinishPartition(ctx context.Context, logger log.Logger, pool shared.CatalogPool, flowName string, runUUID string, partition *protos.QRepPartition, ) error { - if _, err := pool.Exec(ctx, + tx, err := pool.Begin(ctx) + if err != nil { + return fmt.Errorf("error while starting transaction to delete metadata: %w", err) + } + defer shared.RollbackTx(tx, logger) + + if _, err := tx.Exec(ctx, `UPDATE peerdb_stats.qrep_partitions SET end_time=$1 WHERE run_uuid=$2 AND partition_uuid=$3`, time.Now(), runUUID, partition.PartitionId, ); err != nil { return fmt.Errorf("error while updating qrep partition in qrep_partitions: %w", err) } + + if _, err := tx.Exec(ctx, ` + UPDATE peerdb_stats.granular_status + SET snapshot_failing_partition_ids = array_remove(snapshot_failing_partition_ids, $1), + snapshot_updated_at = utc_now() + WHERE flow_name = $2 + `, partition.PartitionId, flowName, + ); err != nil { + return fmt.Errorf("failed to update granular status for snapshot partition: %w", err) + } + + if err := tx.Commit(ctx); err != nil { + return fmt.Errorf("error while committing transaction to finish partition: %w", err) + } return nil } diff --git a/flow/connectors/utils/peers.go b/flow/connectors/utils/peers.go index c4527bdf84..7fb47bb0f9 100644 --- a/flow/connectors/utils/peers.go +++ b/flow/connectors/utils/peers.go @@ -97,7 +97,7 @@ func CreatePeerNoValidate( return wrongConfigResponse, nil } - encodedConfig, encodingErr := proto.Marshal(innerConfig) + encodedConfig, encodingErr := internal.ProtoMarshal(innerConfig) if encodingErr != nil { slog.Error(fmt.Sprintf("failed to encode peer configuration for %s peer %s : %v", peer.Type, peer.Name, encodingErr)) diff --git a/flow/e2e/api/api_test.go b/flow/e2e/api/api_test.go index 4e9c0481c5..0066490146 100644 --- a/flow/e2e/api/api_test.go +++ b/flow/e2e/api/api_test.go @@ -16,7 +16,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" - "google.golang.org/protobuf/proto" connpostgres "github.com/PeerDB-io/peerdb/flow/connectors/postgres" "github.com/PeerDB-io/peerdb/flow/e2e" @@ -132,8 +131,7 @@ func (s Suite) checkCatalogTableMapping( } var config protos.FlowConnectionConfigs - err = proto.Unmarshal(configBytes, &config) - if err != nil { + if err := internal.ProtoUnmarshal(configBytes, &config); err != nil { return false, err } diff --git a/flow/internal/catalog.go b/flow/internal/catalog.go index 79c11b5d9b..0a12cd7115 100644 --- a/flow/internal/catalog.go +++ b/flow/internal/catalog.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/jackc/pgx/v5/pgxpool" @@ -20,6 +21,8 @@ var ( func GetCatalogConnectionPoolFromEnv(ctx context.Context) (shared.CatalogPool, error) { poolMutex.Lock() defer poolMutex.Unlock() + ctx, cancelCtx := context.WithTimeout(ctx, time.Minute) + defer cancelCtx() if pool == nil { var err error catalogConnectionString := GetCatalogConnectionStringFromEnv(ctx) diff --git a/flow/internal/crypt.go b/flow/internal/crypt.go index 1d6c97136d..38716ec971 100644 --- a/flow/internal/crypt.go +++ b/flow/internal/crypt.go @@ -2,6 +2,8 @@ package internal import ( "context" + + "github.com/PeerDB-io/peerdb/flow/shared/exceptions" ) func Decrypt(ctx context.Context, encKeyID string, payload []byte) ([]byte, error) { @@ -12,8 +14,12 @@ func Decrypt(ctx context.Context, encKeyID string, payload []byte) ([]byte, erro keys := PeerDBEncKeys(ctx) key, err := keys.Get(encKeyID) if err != nil { - return nil, err + return nil, exceptions.NewDecryptError(err) } - return key.Decrypt(payload) + if plaintext, err := key.Decrypt(payload); err != nil { + return nil, exceptions.NewDecryptError(err) + } else { + return plaintext, nil + } } diff --git a/flow/internal/dynamicconf.go b/flow/internal/dynamicconf.go index 6945fef4ef..53e2386de0 100644 --- a/flow/internal/dynamicconf.go +++ b/flow/internal/dynamicconf.go @@ -17,6 +17,7 @@ import ( "github.com/PeerDB-io/peerdb/flow/generated/protos" "github.com/PeerDB-io/peerdb/flow/shared" + "github.com/PeerDB-io/peerdb/flow/shared/exceptions" ) const ( @@ -440,7 +441,7 @@ func dynamicConfSigned[T constraints.Signed](ctx context.Context, env map[string }) if err != nil { LoggerFromCtx(ctx).Error("Failed to parse as int64", slog.String("key", key), slog.Any("error", err)) - return 0, fmt.Errorf("failed to parse %s as int64: %w", key, err) + return 0, exceptions.NewDynamicConfError("failed to parse %s as int64: %w", key, err) } return T(value), nil @@ -452,7 +453,7 @@ func dynamicConfUnsigned[T constraints.Unsigned](ctx context.Context, env map[st }) if err != nil { LoggerFromCtx(ctx).Error("Failed to parse as uint64", slog.String("key", key), slog.Any("error", err)) - return 0, fmt.Errorf("failed to parse %s as uint64: %w", key, err) + return 0, exceptions.NewDynamicConfError("failed to parse %s as uint64: %w", key, err) } return T(value), nil @@ -462,7 +463,7 @@ func dynamicConfBool(ctx context.Context, env map[string]string, key string) (bo value, err := dynLookupConvert(ctx, env, key, strconv.ParseBool) if err != nil { LoggerFromCtx(ctx).Error("Failed to parse bool", slog.String("key", key), slog.Any("error", err)) - return false, fmt.Errorf("failed to parse %s as bool: %w", key, err) + return false, exceptions.NewDynamicConfError("failed to parse %s as bool: %w", key, err) } return value, nil diff --git a/flow/internal/postgres.go b/flow/internal/postgres.go index 48c5425674..5dce0f19af 100644 --- a/flow/internal/postgres.go +++ b/flow/internal/postgres.go @@ -7,7 +7,6 @@ import ( "net/url" "go.temporal.io/sdk/log" - "google.golang.org/protobuf/proto" "github.com/PeerDB-io/peerdb/flow/generated/protos" "github.com/PeerDB-io/peerdb/flow/shared" @@ -40,7 +39,7 @@ func UpdateCDCConfigInCatalog(ctx context.Context, pool shared.CatalogPool, ) error { logger.Info("syncing state to catalog: updating config_proto in flows", slog.String("flowName", cfg.FlowJobName)) - cfgBytes, err := proto.Marshal(cfg) + cfgBytes, err := ProtoMarshal(cfg) if err != nil { return fmt.Errorf("unable to marshal flow config: %w", err) } @@ -70,5 +69,5 @@ func LoadTableSchemaFromCatalog( return nil, err } tableSchema := &protos.TableSchema{} - return tableSchema, proto.Unmarshal(tableSchemaBytes, tableSchema) + return tableSchema, ProtoUnmarshal(tableSchemaBytes, tableSchema) } diff --git a/flow/internal/proto.go b/flow/internal/proto.go new file mode 100644 index 0000000000..2599f3ce7f --- /dev/null +++ b/flow/internal/proto.go @@ -0,0 +1,22 @@ +package internal + +import ( + "google.golang.org/protobuf/proto" + + "github.com/PeerDB-io/peerdb/flow/shared/exceptions" +) + +func ProtoUnmarshal(b []byte, m proto.Message) error { + if err := proto.Unmarshal(b, m); err != nil { + return exceptions.NewProtoUnmarshalError(err) + } + return nil +} + +func ProtoMarshal(m proto.Message) ([]byte, error) { + if bytes, err := proto.Marshal(m); err != nil { + return nil, exceptions.NewProtoMarshalError(err) + } else { + return bytes, nil + } +} diff --git a/flow/internal/workflow.go b/flow/internal/workflow.go index 549aec373b..5f40261517 100644 --- a/flow/internal/workflow.go +++ b/flow/internal/workflow.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log/slog" + "time" "github.com/jackc/pgx/v5" "go.temporal.io/sdk/client" @@ -75,6 +76,8 @@ func GetWorkflowStatus(ctx context.Context, pool shared.CatalogPool, func UpdateFlowStatusInCatalog(ctx context.Context, pool shared.CatalogPool, workflowID string, status protos.FlowStatus, ) (protos.FlowStatus, error) { + ctx, cancelCtx := context.WithTimeout(ctx, 30*time.Second) + defer cancelCtx() if _, err := pool.Exec(ctx, "UPDATE flows SET status=$1,updated_at=now() WHERE workflow_id=$2", status, workflowID); err != nil { slog.Error("failed to update flow status", slog.Any("error", err), slog.String("flowID", workflowID)) return status, fmt.Errorf("failed to update flow status: %w", err) diff --git a/flow/shared/constants.go b/flow/shared/constants.go index b297c1e019..3c62b59cb5 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -57,3 +57,5 @@ const FetchAndChannelSize = 256 * 1024 func Ptr[T any](x T) *T { return &x } + +const SlotLagThresholdMiB = 102400 // 100GiB diff --git a/flow/shared/exceptions/flow.go b/flow/shared/exceptions/flow.go index de2f95a670..7e9a0e8968 100644 --- a/flow/shared/exceptions/flow.go +++ b/flow/shared/exceptions/flow.go @@ -1,11 +1,13 @@ package exceptions +import "fmt" + type DropFlowError struct { error } -func NewDropFlowError(err error) *DropFlowError { - return &DropFlowError{err} +func NewDropFlowError(format string, a ...any) *DropFlowError { + return &DropFlowError{fmt.Errorf(format, a...)} } func (e *DropFlowError) Error() string { diff --git a/flow/shared/exceptions/internal.go b/flow/shared/exceptions/internal.go new file mode 100644 index 0000000000..4b45bd7312 --- /dev/null +++ b/flow/shared/exceptions/internal.go @@ -0,0 +1,75 @@ +package exceptions + +import "fmt" + +type CatalogError struct { + error +} + +func NewCatalogError(err error) *CatalogError { + return &CatalogError{err} +} + +func (e *CatalogError) Unwrap() error { + return e.error +} + +type DynamicConfError struct { + error +} + +func NewDynamicConfError(format string, a ...any) *DynamicConfError { + return &DynamicConfError{fmt.Errorf(format, a...)} +} + +func (e *DynamicConfError) Error() string { + return "Dynamic Conf Error: " + e.error.Error() +} + +func (e *DynamicConfError) Unwrap() error { + return e.error +} + +type ProtoMarshalError struct { + error +} + +func NewProtoMarshalError(err error) *ProtoMarshalError { + return &ProtoMarshalError{err} +} + +func (e *ProtoMarshalError) Error() string { + return "Proto Marshal Error: " + e.error.Error() +} + +type ProtoUnmarshalError struct { + error +} + +func NewProtoUnmarshalError(err error) *ProtoUnmarshalError { + return &ProtoUnmarshalError{err} +} + +func (e *ProtoUnmarshalError) Error() string { + return "Proto Unmarshal Error: " + e.error.Error() +} + +func (e *ProtoUnmarshalError) Unwrap() error { + return e.error +} + +type DecryptError struct { + error +} + +func NewDecryptError(err error) *DecryptError { + return &DecryptError{err} +} + +func (e *DecryptError) Error() string { + return "Decrypt Error: " + e.error.Error() +} + +func (e *DecryptError) Unwrap() error { + return e.error +} diff --git a/flow/shared/exceptions/normalize.go b/flow/shared/exceptions/normalize.go index 44a0c62a7f..d208818717 100644 --- a/flow/shared/exceptions/normalize.go +++ b/flow/shared/exceptions/normalize.go @@ -1,11 +1,13 @@ package exceptions +import "fmt" + type NormalizationError struct { error } -func NewNormalizationError(err error) *NormalizationError { - return &NormalizationError{err} +func NewNormalizationError(format string, a ...any) *NormalizationError { + return &NormalizationError{fmt.Errorf(format, a...)} } func (e *NormalizationError) Error() string { diff --git a/flow/shared/exceptions/postgres.go b/flow/shared/exceptions/postgres.go index 948d80f2af..17f3c3f8a8 100644 --- a/flow/shared/exceptions/postgres.go +++ b/flow/shared/exceptions/postgres.go @@ -23,18 +23,6 @@ func (e *PostgresSetupError) Unwrap() error { return e.error } -type CatalogError struct { - error -} - -func NewCatalogError(err error) *CatalogError { - return &CatalogError{err} -} - -func (e *CatalogError) Unwrap() error { - return e.error -} - type PostgresWalError struct { error Msg *pgproto3.ErrorResponse diff --git a/flow/shared/exceptions/skip_alerting.go b/flow/shared/exceptions/skip_alerting.go new file mode 100644 index 0000000000..e4589dd8ce --- /dev/null +++ b/flow/shared/exceptions/skip_alerting.go @@ -0,0 +1,17 @@ +package exceptions + +type SkipLogFlowError struct { + error +} + +func NewSkipLogFlowError(err error) *SkipLogFlowError { + return &SkipLogFlowError{err} +} + +func (e *SkipLogFlowError) Error() string { + return "Skip Log Flow Error: " + e.error.Error() +} + +func (e *SkipLogFlowError) Unwrap() error { + return e.error +} diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 7cd3b417d7..7b42b742a7 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -66,21 +66,19 @@ func NewCDCFlowWorkflowState(ctx workflow.Context, logger log.Logger, cfg *proto } func syncStatusToCatalog(ctx workflow.Context, logger log.Logger, status protos.FlowStatus) { - updateCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ - StartToCloseTimeout: 1 * time.Minute, + workflowID := workflow.GetInfo(ctx).WorkflowExecution.ID + updateCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 4 * 24 * time.Hour, }) - - updateFuture := workflow.ExecuteLocalActivity(updateCtx, updateFlowStatusInCatalogActivity, - workflow.GetInfo(ctx).WorkflowExecution.ID, status) + updateFuture := workflow.ExecuteActivity(updateCtx, flowable.UpdateFlowStatusInCatalogActivity, workflowID, status) if err := updateFuture.Get(updateCtx, nil); err != nil { logger.Warn("Failed to update flow status in catalog", slog.Any("error", err), slog.String("flowStatus", status.String())) } } func (s *CDCFlowWorkflowState) updateStatus(ctx workflow.Context, logger log.Logger, newStatus protos.FlowStatus) { - s.CurrentFlowStatus = newStatus - // update the status in the catalog syncStatusToCatalog(ctx, logger, s.CurrentFlowStatus) + s.CurrentFlowStatus = newStatus } func GetUUID(ctx workflow.Context) string { diff --git a/flow/workflows/local_activities.go b/flow/workflows/local_activities.go index 6418a825a3..9fbd7c85fa 100644 --- a/flow/workflows/local_activities.go +++ b/flow/workflows/local_activities.go @@ -9,7 +9,6 @@ import ( "go.temporal.io/sdk/log" "go.temporal.io/sdk/workflow" - "github.com/PeerDB-io/peerdb/flow/connectors" "github.com/PeerDB-io/peerdb/flow/generated/protos" "github.com/PeerDB-io/peerdb/flow/internal" ) @@ -28,25 +27,6 @@ func getQRepOverwriteFullRefreshMode(wCtx workflow.Context, logger log.Logger, e return fullRefreshEnabled } -func localPeerType(ctx context.Context, name string) (protos.DBType, error) { - pool, err := internal.GetCatalogConnectionPoolFromEnv(ctx) - if err != nil { - return 0, err - } - return connectors.LoadPeerType(ctx, pool, name) -} - -func getPeerType(wCtx workflow.Context, name string) (protos.DBType, error) { - checkCtx := workflow.WithLocalActivityOptions(wCtx, workflow.LocalActivityOptions{ - StartToCloseTimeout: time.Minute, - }) - - getFuture := workflow.ExecuteLocalActivity(checkCtx, localPeerType, name) - var dbtype protos.DBType - err := getFuture.Get(checkCtx, &dbtype) - return dbtype, err -} - func updateCDCConfigInCatalogActivity(ctx context.Context, logger log.Logger, cfg *protos.FlowConnectionConfigs) error { pool, err := internal.GetCatalogConnectionPoolFromEnv(ctx) if err != nil { @@ -54,15 +34,3 @@ func updateCDCConfigInCatalogActivity(ctx context.Context, logger log.Logger, cf } return internal.UpdateCDCConfigInCatalog(ctx, pool, logger, cfg) } - -func updateFlowStatusInCatalogActivity( - ctx context.Context, - workflowID string, - status protos.FlowStatus, -) (protos.FlowStatus, error) { - pool, err := internal.GetCatalogConnectionPoolFromEnv(ctx) - if err != nil { - return status, fmt.Errorf("failed to get catalog connection pool: %w", err) - } - return internal.UpdateFlowStatusInCatalog(ctx, pool, workflowID, status) -} diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index d3ab2fdcb7..9fa2601709 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -83,7 +83,9 @@ func (q *QRepFlowExecution) SetupMetadataTables(ctx workflow.Context) error { }, }) - if err := workflow.ExecuteActivity(ctx, flowable.SetupQRepMetadataTables, q.config).Get(ctx, nil); err != nil { + if err := workflow.ExecuteActivity( + ctx, flowable.SetupQRepMetadataTables, q.config, q.runUUID, + ).Get(ctx, nil); err != nil { return fmt.Errorf("failed to setup metadata tables: %w", err) } @@ -119,7 +121,9 @@ func (q *QRepFlowExecution) setupTableSchema(ctx workflow.Context, tableName str Version: q.config.Version, } - return workflow.ExecuteActivity(ctx, flowable.SetupTableSchema, tableSchemaInput).Get(ctx, nil) + return workflow.ExecuteActivity( + ctx, flowable.SetupQRepTableSchemaActivity, tableSchemaInput, q.config.SnapshotId, q.runUUID, + ).Get(ctx, nil) } func (q *QRepFlowExecution) setupWatermarkTableOnDestination(ctx workflow.Context) error { @@ -161,7 +165,9 @@ func (q *QRepFlowExecution) setupWatermarkTableOnDestination(ctx workflow.Contex IsResync: q.config.DstTableFullResync, } - if err := workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig).Get(ctx, nil); err != nil { + if err := workflow.ExecuteActivity( + ctx, flowable.CreateQRepNormalizedTable, setupConfig, q.config.SnapshotId, q.runUUID, + ).Get(ctx, nil); err != nil { q.logger.Error("failed to create watermark table", slog.Any("error", err)) return fmt.Errorf("failed to create watermark table: %w", err) } @@ -319,6 +325,19 @@ func (q *QRepFlowExecution) consolidatePartitions(ctx workflow.Context) error { return nil } +func (q *QRepFlowExecution) updateStatusSuccess(ctx workflow.Context) error { + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 4 * 24 * time.Hour, + }) + + if err := workflow.ExecuteActivity( + ctx, flowable.UpdateQRepStatusSuccess, q.config, q.runUUID, + ).Get(ctx, nil); err != nil { + return fmt.Errorf("failed to update qrep status: %w", err) + } + return nil +} + func (q *QRepFlowExecution) waitForNewRows( ctx workflow.Context, signalChan model.TypedReceiveChannel[model.CDCFlowSignal], @@ -373,6 +392,8 @@ func (q *QRepFlowExecution) handleTableCreationForResync(ctx workflow.Context, s NewToExistingTableMapping: map[string]string{ renamedTableIdentifier: q.config.DestinationTableIdentifier, }, + SnapshotId: q.config.SnapshotId, + RunUuid: q.runUUID, }) if err := createTablesFromExistingFuture.Get(createTablesFromExistingCtx, nil); err != nil { return fmt.Errorf("failed to create table for mirror resync: %w", err) @@ -412,7 +433,9 @@ func (q *QRepFlowExecution) handleTableRenameForResync(ctx workflow.Context, sta NonRetryableErrorTypes: nil, }, }) - renameTablesFuture := workflow.ExecuteActivity(renameTablesCtx, flowable.RenameTables, renameOpts) + renameTablesFuture := workflow.ExecuteActivity( + renameTablesCtx, flowable.QRepRenameTables, renameOpts, q.config.SnapshotId, q.runUUID, + ) if err := renameTablesFuture.Get(renameTablesCtx, nil); err != nil { return fmt.Errorf("failed to execute rename tables activity: %w", err) } @@ -594,6 +617,10 @@ func QRepFlowWorkflow( } if config.InitialCopyOnly { + if err := q.updateStatusSuccess(ctx); err != nil { + return state, err + } + q.logger.Info("initial copy completed for peer flow") updateStatus(ctx, q.logger, state, protos.FlowStatus_STATUS_COMPLETED) return state, workflow.NewContinueAsNewError(ctx, QRepFlowWorkflow, config, state) @@ -603,6 +630,10 @@ func QRepFlowWorkflow( return state, err } + if err := q.updateStatusSuccess(ctx); err != nil { + return state, err + } + q.logger.Info(fmt.Sprintf("%d partitions processed", len(partitions.Partitions))) state.NumPartitionsProcessed += uint64(len(partitions.Partitions)) diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index 0beefbe657..a9c9e3a03b 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -199,7 +199,7 @@ func (s *SetupFlowExecution) setupNormalizedTables( Version: flowConnectionConfigs.Version, } - if err := workflow.ExecuteActivity(ctx, flowable.SetupTableSchema, tableSchemaInput).Get(ctx, nil); err != nil { + if err := workflow.ExecuteActivity(ctx, flowable.SetupTableSchemaActivity, tableSchemaInput).Get(ctx, nil); err != nil { s.Error("failed to fetch schema for source tables", slog.Any("error", err)) return fmt.Errorf("failed to fetch schema for source tables: %w", err) } @@ -215,7 +215,7 @@ func (s *SetupFlowExecution) setupNormalizedTables( IsResync: flowConnectionConfigs.Resync, } - if err := workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig).Get(ctx, nil); err != nil { + if err := workflow.ExecuteActivity(ctx, flowable.CreateSetupNormalizedTable, setupConfig).Get(ctx, nil); err != nil { s.Error("failed to create normalized tables", slog.Any("error", err)) return fmt.Errorf("failed to create normalized tables: %w", err) } diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index fb8b62b921..68c2f9ed6b 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -31,8 +31,39 @@ type SnapshotFlowExecution struct { logger log.Logger } +func (s *SnapshotFlowExecution) initializeSnapshot(ctx workflow.Context) (int32, error) { + flowName := s.config.FlowJobName + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 4 * 24 * time.Hour, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Second, + }, + }) + var snapshotID int32 + if err := workflow.ExecuteActivity(ctx, snapshot.InitializeSnapshot, flowName).Get(ctx, &snapshotID); err != nil { + return -1, fmt.Errorf("failed to initialize snapshot: %w", err) + } + return snapshotID, nil +} + +func (s *SnapshotFlowExecution) finishSnapshot(ctx workflow.Context, snapshotId int32) error { + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 4 * 24 * time.Hour, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Second, + }, + }) + if err := workflow.ExecuteActivity( + ctx, snapshot.FinishSnapshot, s.config.FlowJobName, snapshotId, + ).Get(ctx, nil); err != nil { + return fmt.Errorf("failed to update end time for snapshot %d: %w", snapshotId, err) + } + return nil +} + func (s *SnapshotFlowExecution) setupReplication( ctx workflow.Context, + snapshotID int32, ) (*protos.SetupReplicationOutput, error) { flowName := s.config.FlowJobName s.logger.Info("setting up replication on source for peer flow") @@ -58,6 +89,7 @@ func (s *SnapshotFlowExecution) setupReplication( ExistingPublicationName: s.config.PublicationName, ExistingReplicationSlotName: s.config.ReplicationSlotName, Env: s.config.Env, + SnapshotId: snapshotID, } var res *protos.SetupReplicationOutput @@ -96,6 +128,7 @@ func (s *SnapshotFlowExecution) cloneTable( boundSelector *shared.BoundSelector, snapshotName string, mapping *protos.TableMapping, + snapshotID int32, ) error { flowName := s.config.FlowJobName cloneLog := slog.Group("clone-log", @@ -126,8 +159,8 @@ func (s *SnapshotFlowExecution) cloneTable( } schemaCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: time.Minute, WaitForCancellation: true, + StartToCloseTimeout: 5 * time.Minute, RetryPolicy: &temporal.RetryPolicy{ InitialInterval: 1 * time.Minute, }, @@ -137,13 +170,21 @@ func (s *SnapshotFlowExecution) cloneTable( snapshot.LoadTableSchema, s.config.FlowJobName, dstName, + snapshotID, ).Get(ctx, &tableSchema) } - parsedSrcTable, err := utils.ParseSchemaTable(srcName) - if err != nil { - s.logger.Error("unable to parse source table", slog.Any("error", err), cloneLog) - return fmt.Errorf("unable to parse source table: %w", err) + parseTableCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Second, + }, + }) + var parsedSrcTable *utils.SchemaTable + if err := workflow.ExecuteActivity( + parseTableCtx, snapshot.ParseSchemaTable, s.config.FlowJobName, srcName, snapshotID, + ).Get(ctx, &parsedSrcTable); err != nil { + return err } from := "*" if len(mapping.Exclude) != 0 { @@ -162,9 +203,19 @@ func (s *SnapshotFlowExecution) cloneTable( // usually MySQL supports double quotes with ANSI_QUOTES, but Vitess doesn't // Vitess currently only supports initial load so change here is enough srcTableEscaped := parsedSrcTable.String() - if dbtype, err := getPeerType(ctx, s.config.SourceName); err != nil { + peerTypeCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Second, + }, + }) + var srcDbtype protos.DBType + if err := workflow.ExecuteActivity( + peerTypeCtx, snapshot.GetPeerType, s.config.FlowJobName, s.config.SourceName, snapshotID, + ).Get(ctx, &srcDbtype); err != nil { return err - } else if dbtype == protos.DBType_MYSQL { + } + if srcDbtype == protos.DBType_MYSQL { srcTableEscaped = parsedSrcTable.MySQL() } @@ -192,9 +243,13 @@ func (s *SnapshotFlowExecution) cloneTable( // ensure document IDs are synchronized across initial load and CDC // for the same document - if dbtype, err := getPeerType(ctx, s.config.DestinationName); err != nil { + var destDbtype protos.DBType + if err := workflow.ExecuteActivity( + peerTypeCtx, snapshot.GetPeerType, s.config.FlowJobName, s.config.DestinationName, snapshotID, + ).Get(ctx, &destDbtype); err != nil { return err - } else if dbtype == protos.DBType_ELASTICSEARCH { + } + if destDbtype == protos.DBType_ELASTICSEARCH { if err := initTableSchema(); err != nil { return err } @@ -227,6 +282,7 @@ func (s *SnapshotFlowExecution) cloneTable( Exclude: mapping.Exclude, Columns: mapping.Columns, Version: s.config.Version, + SnapshotId: snapshotID, } boundSelector.SpawnChild(childCtx, QRepFlowWorkflow, nil, config, nil) @@ -240,6 +296,7 @@ func (s *SnapshotFlowExecution) cloneTables( snapshotName string, supportsTIDScans bool, maxParallelClones int, + snapshotID int32, ) error { if snapshotType == SNAPSHOT_TYPE_SLOT { s.logger.Info("cloning tables for slot", slog.String("slot", slotName), slog.String("snapshot", snapshotName)) @@ -265,7 +322,7 @@ func (s *SnapshotFlowExecution) cloneTables( if v.PartitionKey == "" { v.PartitionKey = defaultPartitionCol } - if err := s.cloneTable(ctx, boundSelector, snapshotName, v); err != nil { + if err := s.cloneTable(ctx, boundSelector, snapshotName, v, snapshotID); err != nil { s.logger.Error("failed to start clone child workflow", slog.Any("error", err)) continue } @@ -284,8 +341,9 @@ func (s *SnapshotFlowExecution) cloneTablesWithSlot( ctx workflow.Context, sessionCtx workflow.Context, numTablesInParallel int, + snapshotID int32, ) error { - slotInfo, err := s.setupReplication(sessionCtx) + slotInfo, err := s.setupReplication(sessionCtx, snapshotID) if err != nil { return fmt.Errorf("failed to setup replication: %w", err) } @@ -312,6 +370,7 @@ func (s *SnapshotFlowExecution) cloneTablesWithSlot( snapshotName, supportsTidScans, numTablesInParallel, + snapshotID, ); err != nil { s.logger.Error("failed to clone tables", slog.Any("error", err)) return fmt.Errorf("failed to clone tables: %w", err) @@ -344,8 +403,13 @@ func SnapshotFlowWorkflow( } defer workflow.CompleteSession(sessionCtx) + snapshotID, err := se.initializeSnapshot(ctx) + if err != nil { + return fmt.Errorf("failed to initialize snapshot: %w", err) + } + if !config.DoInitialSnapshot { - if _, err := se.setupReplication(sessionCtx); err != nil { + if _, err := se.setupReplication(sessionCtx, snapshotID); err != nil { return fmt.Errorf("failed to setup replication: %w", err) } @@ -353,6 +417,10 @@ func SnapshotFlowWorkflow( return fmt.Errorf("failed to close slot keep alive: %w", err) } + if err := se.finishSnapshot(sessionCtx, snapshotID); err != nil { + return fmt.Errorf("failed to finish snapshot: %w", err) + } + return nil } @@ -374,6 +442,7 @@ func SnapshotFlowWorkflow( sessionInfo.SessionID, config.SourceName, config.Env, + snapshotID, ) fExportSnapshot := workflow.ExecuteActivity( @@ -407,12 +476,17 @@ func SnapshotFlowWorkflow( txnSnapshotState.SnapshotName, txnSnapshotState.SupportsTIDScans, numTablesInParallel, + snapshotID, ); err != nil { return fmt.Errorf("failed to clone tables: %w", err) } - } else if err := se.cloneTablesWithSlot(ctx, sessionCtx, numTablesInParallel); err != nil { + } else if err := se.cloneTablesWithSlot(ctx, sessionCtx, numTablesInParallel, snapshotID); err != nil { return fmt.Errorf("failed to clone slots and create replication slot: %w", err) } + if err := se.finishSnapshot(sessionCtx, snapshotID); err != nil { + return fmt.Errorf("failed to finish snapshot: %w", err) + } + return nil } diff --git a/nexus/catalog/migrations/V47__granular_status.sql b/nexus/catalog/migrations/V47__granular_status.sql new file mode 100644 index 0000000000..9e406f027d --- /dev/null +++ b/nexus/catalog/migrations/V47__granular_status.sql @@ -0,0 +1,32 @@ +create function utc_now() returns timestamp as $$ + select now() at time zone 'utc'; +$$ language sql; + +CREATE TABLE IF NOT EXISTS peerdb_stats.snapshots ( + flow_name TEXT NOT NULL, + snapshot_id SERIAL PRIMARY KEY, + start_time TIMESTAMP NOT NULL, + end_time TIMESTAMP +); + +CREATE TABLE IF NOT EXISTS peerdb_stats.granular_status ( + -- ideally a foreign key but flows.name is not guaranteed to be unique + flow_name text PRIMARY KEY, + snapshot_current_id INTEGER, + snapshot_succeeding BOOLEAN NOT NULL, + snapshot_failing_qrep_run_ids TEXT[] NOT NULL DEFAULT '{}', + snapshot_failing_partition_ids TEXT[] NOT NULL DEFAULT '{}', + snapshot_is_internal_error BOOLEAN NOT NULL DEFAULT false, + snapshot_updated_at TIMESTAMP NOT NULL DEFAULT utc_now(), + sync_succeeding BOOLEAN NOT NULL, + sync_is_internal_error BOOLEAN NOT NULL DEFAULT false, + sync_last_successful_batch_id BIGINT, + sync_updated_at TIMESTAMP NOT NULL DEFAULT utc_now(), + normalize_succeeding BOOLEAN NOT NULL, + normalize_is_internal_error BOOLEAN NOT NULL DEFAULT false, + normalize_last_successful_batch_id BIGINT, + normalize_updated_at TIMESTAMP NOT NULL DEFAULT utc_now(), + slot_lag_low BOOLEAN NOT NULL, + slot_lag_mib FLOAT, + slot_lag_updated_at TIMESTAMP NOT NULL DEFAULT utc_now() +); diff --git a/protos/flow.proto b/protos/flow.proto index 9574233aac..f079440998 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -111,6 +111,8 @@ message CreateTablesFromExistingInput { string flow_job_name = 1; map new_to_existing_table_mapping = 3; string peer_name = 4; + int32 snapshot_id = 5; + string run_uuid = 6; } message CreateTablesFromExistingOutput { @@ -152,6 +154,7 @@ message SetupReplicationInput { string existing_replication_slot_name = 7; string peer_name = 8; string destination_name = 9; + int32 snapshot_id = 10; } message SetupReplicationOutput { @@ -342,6 +345,8 @@ message QRepConfig { repeated ColumnSetting columns = 27; uint32 version = 28; + + int32 snapshot_id = 29; } message QRepPartition { diff --git a/ui/app/mirrors/create/helpers/common.ts b/ui/app/mirrors/create/helpers/common.ts index 597f34e806..f25374d97e 100644 --- a/ui/app/mirrors/create/helpers/common.ts +++ b/ui/app/mirrors/create/helpers/common.ts @@ -73,4 +73,5 @@ export const blankQRepSetting: QRepConfig = { parentMirrorName: '', exclude: [], columns: [], + snapshotId: 0, };