diff --git a/flow/alerting/classifier.go b/flow/alerting/classifier.go index 918663ed2e..a880009ea2 100644 --- a/flow/alerting/classifier.go +++ b/flow/alerting/classifier.go @@ -185,6 +185,10 @@ var ( ErrorNotifyPostgresSlotMemalloc = ErrorClass{ Class: "NOTIFY_POSTGRES_SLOT_MEMALLOC", action: NotifyUser, } + // This RDS specific error is seen when we try to create a replication slot on a read-replica + ErrNotifyPostgresCreatingSlotOnReader = ErrorClass{ + Class: "NOTIFY_POSTGRES_CREATING_SLOT_ON_READER", action: NotifyUser, + } // Mongo specific, equivalent to slot invalidation in Postgres ErrorNotifyChangeStreamHistoryLost = ErrorClass{ Class: "NOTIFY_CHANGE_STREAM_HISTORY_LOST", action: NotifyUser, @@ -497,6 +501,9 @@ func GetErrorClass(ctx context.Context, err error) (ErrorClass, ErrorInfo) { return ErrorNotifyPostgresSlotMemalloc, pgErrorInfo } + if strings.Contains(pgErr.Message, "Create the replication slot from the writer node instead") { + return ErrNotifyPostgresCreatingSlotOnReader, pgErrorInfo + } // Fall through for other internal errors return ErrorOther, pgErrorInfo diff --git a/flow/alerting/classifier_test.go b/flow/alerting/classifier_test.go index a015ee5c9a..318429aa26 100644 --- a/flow/alerting/classifier_test.go +++ b/flow/alerting/classifier_test.go @@ -291,6 +291,22 @@ func TestPostgresInvalidValueForSynchronizedStandbySlots(t *testing.T) { }, errInfo, "Unexpected error info") } +func TestPostgresCreatingSlotOnReader(t *testing.T) { + err := &pgconn.PgError{ + Severity: "ERROR", + Code: pgerrcode.InternalError, + Message: `ERROR: Creating logical replication slot peerflow_slot_mirror_1cd7f87b__d143__4cea__a247__a2acc5f5b746 + is not supported on the Multi-AZ DB cluster reader node. + Create the replication slot from the writer node instead. (SQLSTATE XX000)`, + } + errorClass, errInfo := GetErrorClass(t.Context(), fmt.Errorf("slot error: [slot] error creating replication slot: %w", err)) + assert.Equal(t, ErrNotifyPostgresCreatingSlotOnReader, errorClass, "Unexpected error class") + assert.Equal(t, ErrorInfo{ + Source: ErrorSourcePostgres, + Code: pgerrcode.InternalError, + }, errInfo, "Unexpected error info") +} + func TestPostgresStaleFileHandleErrorShouldBeRecoverable(t *testing.T) { // Simulate a stale file handle error err := &exceptions.PostgresWalError{