Skip to content
Draft
322 changes: 234 additions & 88 deletions flow/activities/flowable.go

Large diffs are not rendered by default.

129 changes: 72 additions & 57 deletions flow/activities/flowable_core.go

Large diffs are not rendered by default.

81 changes: 74 additions & 7 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (

"github.com/PeerDB-io/peerdb/flow/alerting"
"github.com/PeerDB-io/peerdb/flow/connectors"
"github.com/PeerDB-io/peerdb/flow/connectors/utils"
"github.com/PeerDB-io/peerdb/flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peerdb/flow/generated/protos"
"github.com/PeerDB-io/peerdb/flow/internal"
"github.com/PeerDB-io/peerdb/flow/shared"
Expand Down Expand Up @@ -48,7 +50,30 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName s
delete(a.SlotSnapshotStates, flowJobName)
}
a.Alerter.LogFlowEvent(ctx, flowJobName, "Ended Snapshot Flow Job")
return nil
}

func (a *SnapshotActivity) InitializeSnapshot(
ctx context.Context, flowName string,
) (int32, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, flowName)
logger := internal.LoggerFromCtx(ctx)

snapshotID, err := monitoring.InitializeSnapshot(ctx, logger, a.CatalogPool, flowName)
if err != nil {
return -1, a.Alerter.LogFlowErrorNoStatus(ctx, flowName, err)
}
return snapshotID, nil
}

func (a *SnapshotActivity) FinishSnapshot(
ctx context.Context, flowName string, snapshotID int32,
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, flowName)
logger := internal.LoggerFromCtx(ctx)
if err := monitoring.FinishSnapshot(ctx, logger, a.CatalogPool, flowName, snapshotID); err != nil {
return a.Alerter.LogFlowSnapshotError(ctx, flowName, snapshotID, err)
}
return nil
}

Expand All @@ -63,7 +88,8 @@ func (a *SnapshotActivity) SetupReplication(

conn, err := connectors.GetByNameAs[connectors.CDCPullConnectorCore](ctx, nil, a.CatalogPool, config.PeerName)
if err != nil {
return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to get connector: %w", err))
conErr := fmt.Errorf("failed to get connector: %w", err)
return nil, a.Alerter.LogFlowSnapshotError(ctx, config.FlowJobName, config.SnapshotId, conErr)
}

logger.Info("waiting for slot to be created...")
Expand All @@ -72,7 +98,8 @@ func (a *SnapshotActivity) SetupReplication(
if err != nil {
connectors.CloseConnector(ctx, conn)
// it is important to close the connection here as it is not closed in CloseSlotKeepAlive
return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("slot error: %w", err))
slotErr := fmt.Errorf("slot error: %w", err)
return nil, a.Alerter.LogFlowSnapshotError(ctx, config.FlowJobName, config.SnapshotId, slotErr)
} else if slotInfo.Conn == nil && slotInfo.SlotName == "" {
connectors.CloseConnector(ctx, conn)
logger.Info("replication setup without slot")
Expand All @@ -99,20 +126,22 @@ func (a *SnapshotActivity) SetupReplication(
}, nil
}

func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, peer string, env map[string]string) error {
func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, peer string, env map[string]string, snapshotID int32) error {
shutdown := heartbeatRoutine(ctx, func() string {
return "maintaining transaction snapshot"
})
defer shutdown()
conn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, nil, a.CatalogPool, peer)
if err != nil {
return a.Alerter.LogFlowError(ctx, sessionID, err)
getErr := fmt.Errorf("failed to get connector: %w", err)
return a.Alerter.LogFlowSnapshotError(ctx, sessionID, snapshotID, getErr)
}
defer connectors.CloseConnector(ctx, conn)

exportSnapshotOutput, tx, err := conn.ExportTxSnapshot(ctx, env)
if err != nil {
return err
exportErr := fmt.Errorf("failed to export tx snapshot: %w", err)
return a.Alerter.LogFlowSnapshotError(ctx, sessionID, snapshotID, exportErr)
}

a.SnapshotStatesMutex.Lock()
Expand All @@ -134,7 +163,11 @@ func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, pee
a.SnapshotStatesMutex.Lock()
delete(a.TxSnapshotStates, sessionID)
a.SnapshotStatesMutex.Unlock()
return conn.FinishExport(tx)
if err := conn.FinishExport(tx); err != nil {
finishErr := fmt.Errorf("failed to finish export: %w", err)
return a.Alerter.LogFlowSnapshotError(ctx, sessionID, snapshotID, finishErr)
}
return nil
}
time.Sleep(time.Minute)
}
Expand Down Expand Up @@ -166,6 +199,40 @@ func (a *SnapshotActivity) LoadTableSchema(
ctx context.Context,
flowName string,
tableName string,
snapshotID int32,
) (*protos.TableSchema, error) {
return internal.LoadTableSchemaFromCatalog(ctx, a.CatalogPool, flowName, tableName)
schema, err := internal.LoadTableSchemaFromCatalog(ctx, a.CatalogPool, flowName, tableName)
if err != nil {
loadErr := fmt.Errorf("failed to load schema from catalog: %w", err)
return nil, a.Alerter.LogFlowSnapshotError(ctx, flowName, snapshotID, loadErr)
}
return schema, nil
}

func (a *SnapshotActivity) ParseSchemaTable(
ctx context.Context,
flowName string,
tableName string,
snapshotID int32,
) (*utils.SchemaTable, error) {
parsedTable, err := utils.ParseSchemaTable(tableName)
if err != nil {
parseErr := fmt.Errorf("failed to parse schema table: %w", err)
return nil, a.Alerter.LogFlowSnapshotError(ctx, flowName, snapshotID, parseErr)
}
return parsedTable, nil
}

func (a *SnapshotActivity) GetPeerType(
ctx context.Context,
flowName string,
peerName string,
snapshotID int32,
) (protos.DBType, error) {
dbtype, err := connectors.LoadPeerType(ctx, a.CatalogPool, peerName)
if err != nil {
peerErr := fmt.Errorf("failed to get peer type: %w", err)
return 0, a.Alerter.LogFlowSnapshotError(ctx, flowName, snapshotID, peerErr)
}
return dbtype, nil
}
84 changes: 68 additions & 16 deletions flow/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,25 +444,41 @@ const (
flowErrorTypeError flowErrorType = "error"
)

// logFlowErrorInternal pushes the error to the errors table and emits a metric as well as a telemetry message
func (a *Alerter) logFlowErrorInternal(
// logFlowErrorImpl pushes the error to the errors table and emits a metric as well as a telemetry message
func (a *Alerter) logFlowErrorImpl(
ctx context.Context,
flowName string,
errorType flowErrorType,
inErr error,
logger log.Logger,
loggerFunc func(string, ...any),
) {
logger := internal.LoggerFromCtx(ctx)
inErrWithStack := fmt.Sprintf("%+v", inErr)
loggerFunc(inErr.Error(), slog.String("stack", inErrWithStack))
if _, err := a.CatalogPool.Exec(
ctx, "INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)",
flowName, inErrWithStack, errorType.String(),
); err != nil {
logger.Error("failed to insert flow error", slog.Any("error", err))
return
retryInterval := time.Second
for {
if _, err := a.CatalogPool.Exec(
ctx, "INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)",
flowName, inErrWithStack, errorType,
); err != nil {
insertErr := shared.LogError(logger, fmt.Errorf("failed to insert flow error: %w", err))
errInfo := ErrorInfo{Source: ErrorSourcePostgresCatalog, Code: "UNKNOWN"}
a.emitClassifiedError(ctx, logger, flowName, errorType, ErrorInternal, errInfo, insertErr, insertErr.Error(), loggerFunc)
time.Sleep(retryInterval)
retryInterval = min(retryInterval*2, time.Minute)
continue
}
break
}

errorClass, errInfo := GetErrorClass(ctx, inErr)
a.emitClassifiedError(ctx, logger, flowName, errorType, errorClass, errInfo, inErr, inErrWithStack, loggerFunc)
}

func (a *Alerter) emitClassifiedError(
ctx context.Context, logger log.Logger, flowName string, errorType flowErrorType, errorClass ErrorClass, errInfo ErrorInfo,
inErr error, inErrWithStack string, loggerFunc func(string, ...any),
) {
var tags []string
if errors.Is(inErr, context.Canceled) {
tags = append(tags, string(shared.ErrTypeCanceled))
Expand Down Expand Up @@ -494,14 +510,11 @@ func (a *Alerter) logFlowErrorInternal(
if errors.As(inErr, &sshErr) {
tags = append(tags, string(shared.ErrTypeNet))
}

errorClass, errInfo := GetErrorClass(ctx, inErr)
tags = append(tags, "errorClass:"+errorClass.String(), "errorAction:"+errorClass.ErrorAction().String())

if !internal.PeerDBTelemetryErrorActionBasedAlertingEnabled() || errorClass.ErrorAction() == NotifyTelemetry {
// Warnings alert us just like errors until there's a customer warning system
a.sendTelemetryMessage(ctx, logger, flowName, inErrWithStack, telemetry.ERROR, tags...)
}

loggerFunc(fmt.Sprintf("Emitting classified error '%s'", inErr.Error()),
slog.Any("error", inErr),
slog.Any("errorClass", errorClass),
Expand Down Expand Up @@ -530,15 +543,54 @@ func (a *Alerter) logFlowErrorInternal(
gauge.Record(ctx, 1, errorAttributeSet)
}

func (a *Alerter) LogFlowError(ctx context.Context, flowName string, inErr error) error {
func (a *Alerter) LogFlowErrorNoStatus(ctx context.Context, flowName string, inErr error) error {
logger := internal.LoggerFromCtx(ctx)
// TODO check that this one just logs without updating status
a.logFlowErrorImpl(ctx, flowName, flowErrorTypeError, inErr, logger, logger.Error)
return inErr
}

func (a *Alerter) LogFlowSyncError(ctx context.Context, flowName string, batchID int64, inErr error) error {
logger := internal.LoggerFromCtx(ctx)
// TODO use batchID
a.logFlowErrorImpl(ctx, flowName, flowErrorTypeError, inErr, logger, logger.Error)
return inErr
}

func (a *Alerter) LogFlowNormalizeError(ctx context.Context, flowName string, batchID int64, inErr error) {
logger := internal.LoggerFromCtx(ctx)
// TODO use batchID
a.logFlowErrorImpl(ctx, flowName, flowErrorTypeError, inErr, logger, logger.Error)
}

func (a *Alerter) LogFlowSnapshotError(ctx context.Context, flowName string, snapshotID int32, inErr error) error {
logger := internal.LoggerFromCtx(ctx)
// TODO use snapshotID
a.logFlowErrorImpl(ctx, flowName, flowErrorTypeError, inErr, logger, logger.Error)
return inErr
}

func (a *Alerter) LogFlowSnapshotQRepError(
ctx context.Context, flowName string, snapshotID int32, qRepRunID string, inErr error,
) error {
logger := internal.LoggerFromCtx(ctx)
// TODO use snapshotID, qRepRunID
a.logFlowErrorImpl(ctx, flowName, flowErrorTypeError, inErr, logger, logger.Error)
return inErr
}

func (a *Alerter) LogFlowSnapshotPartitionError(
ctx context.Context, flowName string, snapshotID int32, partitionID string, inErr error,
) error {
logger := internal.LoggerFromCtx(ctx)
a.logFlowErrorInternal(ctx, flowName, flowErrorTypeError, inErr, logger.Error)
// TODO use snapshotID, partitionID
a.logFlowErrorImpl(ctx, flowName, flowErrorTypeError, inErr, logger, logger.Error)
return inErr
}

func (a *Alerter) LogFlowWarning(ctx context.Context, flowName string, inErr error) {
logger := internal.LoggerFromCtx(ctx)
a.logFlowErrorInternal(ctx, flowName, flowErrorTypeWarn, inErr, logger.Warn)
a.logFlowErrorImpl(ctx, flowName, flowErrorTypeWarn, inErr, logger, logger.Warn)
}

func (a *Alerter) LogFlowEvent(ctx context.Context, flowName string, info string) {
Expand Down
24 changes: 24 additions & 0 deletions flow/alerting/classifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,30 @@ func GetErrorClass(ctx context.Context, err error) (ErrorClass, ErrorInfo) {
}
}

var dynamicConfError *exceptions.DynamicConfError
if errors.As(err, &dynamicConfError) {
return ErrorInternal, ErrorInfo{
Source: ErrorSourceOther,
Code: "UNKNOWN",
}
}

var protoMarshalError *exceptions.ProtoMarshalError
if errors.As(err, &protoMarshalError) {
return ErrorInternal, ErrorInfo{
Source: ErrorSourceOther,
Code: "UNKNOWN",
}
}

var protoUnmarshalError *exceptions.ProtoUnmarshalError
if errors.As(err, &protoUnmarshalError) {
return ErrorInternal, ErrorInfo{
Source: ErrorSourceOther,
Code: "UNKNOWN",
}
}

if errors.Is(err, context.Canceled) {
// Generally happens during workflow cancellation
return ErrorIgnoreContextCancelled, ErrorInfo{
Expand Down
12 changes: 6 additions & 6 deletions flow/alerting/classifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestClickHousePushingToViewShouldBeMvError(t *testing.T) {
is not supported: while converting source column created_at to destination column created_at:
while pushing to view db_name.hello_mv`,
}
errorClass, errInfo := GetErrorClass(t.Context(), exceptions.NewNormalizationError(fmt.Errorf("error in WAL: %w", err)))
errorClass, errInfo := GetErrorClass(t.Context(), exceptions.NewNormalizationError("error in WAL: %w", err))
assert.Equal(t, ErrorNotifyMVOrView, errorClass, "Unexpected error class")
assert.Equal(t, ErrorInfo{
Source: ErrorSourceClickHouse,
Expand Down Expand Up @@ -199,8 +199,8 @@ func TestClickHouseChaoticNormalizeErrorShouldBeNotifyMVNow(t *testing.T) {
Left key __table1.column_2 type String. Right key __table2.column_1 type Int64`,
}
errorClass, errInfo := GetErrorClass(t.Context(),
exceptions.NewNormalizationError(fmt.Errorf(`Normalization Error: failed to normalize records:
error while inserting into normalized table table_A: %w`, err)))
exceptions.NewNormalizationError(`Normalization Error: failed to normalize records:
error while inserting into normalized table table_A: %w`, err))
assert.Equal(t, ErrorNotifyMVOrView, errorClass, "Unexpected error class")
assert.Equal(t, ErrorInfo{
Source: ErrorSourceClickHouse,
Expand Down Expand Up @@ -405,7 +405,7 @@ func TestClickHouseUnknownTableShouldBeDestinationModified(t *testing.T) {
Message: "Table abc does not exist.",
}
errorClass, errInfo := GetErrorClass(t.Context(),
exceptions.NewNormalizationError(fmt.Errorf("failed to normalize records: %w", err)))
exceptions.NewNormalizationError("failed to normalize records: %w", err))
assert.Equal(t, ErrorNotifyDestinationModified, errorClass, "Unexpected error class")
assert.Equal(t, ErrorInfo{
Source: ErrorSourceClickHouse,
Expand All @@ -421,7 +421,7 @@ func TestClickHouseUnkownTableWhilePushingToViewShouldBeNotifyMVNow(t *testing.T
Message: "Table abc does not exist. Maybe you meant abc2?: while executing 'FUNCTION func()': while pushing to view some_mv (some-uuid-here)",
}
errorClass, errInfo := GetErrorClass(t.Context(),
exceptions.NewNormalizationError(fmt.Errorf("failed to normalize records: %w", err)))
exceptions.NewNormalizationError("failed to normalize records: %w", err))
assert.Equal(t, ErrorNotifyMVOrView, errorClass, "Unexpected error class")
assert.Equal(t, ErrorInfo{
Source: ErrorSourceClickHouse,
Expand All @@ -436,7 +436,7 @@ func TestNonClassifiedNormalizeErrorShouldBeNotifyMVNow(t *testing.T) {
Message: "JOIN ANY LEFT JOIN ... ON a.id = b.b_id ambiguous identifier 'c_id'. In scope SELECT ...",
}
errorClass, errInfo := GetErrorClass(t.Context(),
exceptions.NewNormalizationError(fmt.Errorf("failed to normalize records: %w", err)))
exceptions.NewNormalizationError("failed to normalize records: %w", err))
assert.Equal(t, ErrorNotifyMVOrView, errorClass, "Unexpected error class")
assert.Equal(t, ErrorInfo{
Source: ErrorSourceClickHouse,
Expand Down
Loading
Loading