Skip to content

Commit 1525dfe

Browse files
committed
Proposal: Activity to fetch config from the DB
This uses a new helper in `internal.FetchConfigFromDB` to fetch a fully hydrated `protos.FlowConnectionConfigs`. When passing this config to Temporal we strip the `tableMappings` array element which can cause it to go over the 2MB limit. This contains a subset of the changes which were originally proposed in: PeerDB-io#3407
1 parent c3aab43 commit 1525dfe

File tree

6 files changed

+141
-64
lines changed

6 files changed

+141
-64
lines changed

flow/activities/flowable.go

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ import (
55
"errors"
66
"fmt"
77
"log/slog"
8+
"maps"
89
"net"
910
"os"
11+
"slices"
1012
"strconv"
1113
"sync/atomic"
1214
"time"
@@ -125,6 +127,13 @@ func (a *FlowableActivity) EnsurePullability(
125127
}
126128
defer connectors.CloseConnector(ctx, srcConn)
127129

130+
// We can fetch from the DB, as we are in the activity
131+
cfg, err := internal.FetchConfigFromDB(config.FlowJobName, ctx)
132+
if err != nil {
133+
return nil, err
134+
}
135+
config.SourceTableIdentifiers = slices.Sorted(maps.Keys(internal.TableNameMapping(cfg.TableMappings, cfg.Resync)))
136+
128137
output, err := srcConn.EnsurePullability(ctx, config)
129138
if err != nil {
130139
return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to ensure pullability: %w", err))
@@ -145,6 +154,12 @@ func (a *FlowableActivity) CreateRawTable(
145154
}
146155
defer connectors.CloseConnector(ctx, dstConn)
147156

157+
cfg, err := internal.FetchConfigFromDB(config.FlowJobName, ctx)
158+
if err != nil {
159+
return nil, err
160+
}
161+
config.TableNameMapping = internal.TableNameMapping(cfg.TableMappings, cfg.Resync)
162+
148163
res, err := dstConn.CreateRawTable(ctx, config)
149164
if err != nil {
150165
return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
@@ -174,11 +189,15 @@ func (a *FlowableActivity) SetupTableSchema(
174189
}
175190
defer connectors.CloseConnector(ctx, srcConn)
176191

177-
tableNameSchemaMapping, err := srcConn.GetTableSchema(ctx, config.Env, config.Version, config.System, config.TableMappings)
192+
cfg, err := internal.FetchConfigFromDB(config.FlowName, ctx)
193+
if err != nil {
194+
return err
195+
}
196+
tableNameSchemaMapping, err := srcConn.GetTableSchema(ctx, config.Env, config.Version, config.System, cfg.TableMappings)
178197
if err != nil {
179198
return a.Alerter.LogFlowError(ctx, config.FlowName, fmt.Errorf("failed to get GetTableSchemaConnector: %w", err))
180199
}
181-
processed := internal.BuildProcessedSchemaMapping(config.TableMappings, tableNameSchemaMapping, logger)
200+
processed := internal.BuildProcessedSchemaMapping(cfg.TableMappings, tableNameSchemaMapping, logger)
182201

183202
tx, err := a.CatalogPool.BeginTx(ctx, pgx.TxOptions{})
184203
if err != nil {
@@ -243,9 +262,13 @@ func (a *FlowableActivity) CreateNormalizedTable(
243262
return nil, err
244263
}
245264

265+
cfg, err := internal.FetchConfigFromDB(config.FlowName, ctx)
266+
if err != nil {
267+
return nil, err
268+
}
246269
numTablesToSetup.Store(int32(len(tableNameSchemaMapping)))
247270
tableExistsMapping := make(map[string]bool, len(tableNameSchemaMapping))
248-
for _, tableMapping := range config.TableMappings {
271+
for _, tableMapping := range cfg.TableMappings {
249272
tableIdentifier := tableMapping.DestinationTableIdentifier
250273
tableSchema := tableNameSchemaMapping[tableIdentifier]
251274
existing, err := conn.SetupNormalizedTable(
@@ -292,6 +315,16 @@ func (a *FlowableActivity) SyncFlow(
292315
var normalizeWaiting atomic.Bool
293316
var syncingBatchID atomic.Int64
294317
var syncState atomic.Pointer[string]
318+
319+
cfg, err := internal.FetchConfigFromDB(config.FlowJobName, ctx)
320+
if err != nil {
321+
return err
322+
}
323+
324+
// Override config with DB values to deal with the large fields.
325+
config = cfg
326+
options.TableMappings = cfg.TableMappings
327+
295328
syncState.Store(shared.Ptr("setup"))
296329
shutdown := heartbeatRoutine(ctx, func() string {
297330
// Must load Waiting after BatchID to avoid race saying we're waiting on currently processing batch

flow/activities/snapshot_activity.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,14 @@ func (a *SnapshotActivity) SetupReplication(
6565
return nil, a.Alerter.LogFlowError(ctx, config.FlowJobName, fmt.Errorf("failed to get connector: %w", err))
6666
}
6767

68+
configCtx := context.Background()
69+
defer configCtx.Done()
70+
cfg, err := internal.FetchConfigFromDB(config.FlowJobName, configCtx)
71+
if err != nil {
72+
return nil, err
73+
}
74+
config.TableNameMapping = internal.TableNameMapping(cfg.TableMappings, cfg.Resync)
75+
6876
logger.Info("waiting for slot to be created...")
6977
slotInfo, err := conn.SetupReplication(ctx, config)
7078

@@ -180,8 +188,12 @@ func (a *SnapshotActivity) GetDefaultPartitionKeyForTables(
180188
}
181189
defer connectors.CloseConnector(ctx, connector)
182190

191+
cfg, err := internal.FetchConfigFromDB(input.FlowJobName, ctx)
192+
if err != nil {
193+
return nil, err
194+
}
183195
output, err := connector.GetDefaultPartitionKeyForTables(ctx, &protos.GetDefaultPartitionKeyForTablesInput{
184-
TableMappings: input.TableMappings,
196+
TableMappings: cfg.TableMappings,
185197
})
186198
if err != nil {
187199
return nil, a.Alerter.LogFlowError(ctx, input.FlowJobName, fmt.Errorf("failed to check if tables can parallel load: %w", err))
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package internal
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"fmt"
7+
8+
"github.com/PeerDB-io/peerdb/flow/generated/protos"
9+
"google.golang.org/protobuf/proto"
10+
)
11+
12+
func TableNameMapping(tableMappings []*protos.TableMapping, resync bool) map[string]string {
13+
tblNameMapping := make(map[string]string, len(tableMappings))
14+
if resync {
15+
for _, mapping := range tableMappings {
16+
if mapping.Engine != protos.TableEngine_CH_ENGINE_NULL {
17+
mapping.DestinationTableIdentifier += "_resync"
18+
}
19+
}
20+
}
21+
for _, v := range tableMappings {
22+
tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier
23+
}
24+
25+
return tblNameMapping
26+
}
27+
28+
func MinimizeFlowConfiguration(cfg *protos.FlowConnectionConfigs) *protos.FlowConnectionConfigs {
29+
if cfg == nil {
30+
return nil
31+
}
32+
cfg.TableMappings = nil
33+
return cfg
34+
}
35+
36+
func FetchConfigFromDB(flowName string, ctx context.Context) (*protos.FlowConnectionConfigs, error) {
37+
var configBytes sql.RawBytes
38+
pool, _ := GetCatalogConnectionPoolFromEnv(ctx)
39+
if err := pool.QueryRow(ctx,
40+
"SELECT config_proto FROM flows WHERE name = $1 LIMIT 1", flowName,
41+
).Scan(&configBytes); err != nil {
42+
return nil, fmt.Errorf("unable to query flow config from catalog: %w", err)
43+
}
44+
45+
var cfgFromDB protos.FlowConnectionConfigs
46+
if err := proto.Unmarshal(configBytes, &cfgFromDB); err != nil {
47+
return nil, fmt.Errorf("unable to unmarshal flow config: %w", err)
48+
}
49+
50+
return &cfgFromDB, nil
51+
}

flow/workflows/cdc_flow.go

Lines changed: 15 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,14 @@ type CDCFlowWorkflowState struct {
4545

4646
// returns a new empty PeerFlowState
4747
func NewCDCFlowWorkflowState(ctx workflow.Context, logger log.Logger, cfg *protos.FlowConnectionConfigs) *CDCFlowWorkflowState {
48-
tableMappings := make([]*protos.TableMapping, 0, len(cfg.TableMappings))
49-
for _, tableMapping := range cfg.TableMappings {
50-
tableMappings = append(tableMappings, proto.CloneOf(tableMapping))
51-
}
5248
state := CDCFlowWorkflowState{
5349
ActiveSignal: model.NoopSignal,
5450
CurrentFlowStatus: protos.FlowStatus_STATUS_SETUP,
5551
FlowConfigUpdate: nil,
5652
SyncFlowOptions: &protos.SyncFlowOptions{
5753
BatchSize: cfg.MaxBatchSize,
5854
IdleTimeoutSeconds: cfg.IdleTimeoutSeconds,
59-
TableMappings: tableMappings,
55+
TableMappings: []*protos.TableMapping{},
6056
},
6157
SnapshotNumRowsPerPartition: cfg.SnapshotNumRowsPerPartition,
6258
SnapshotNumPartitionsOverride: cfg.SnapshotNumPartitionsOverride,
@@ -129,6 +125,7 @@ func syncStateToConfigProtoInCatalog(
129125
cfg *protos.FlowConnectionConfigs,
130126
state *CDCFlowWorkflowState,
131127
) *protos.FlowConnectionConfigs {
128+
// TODO: make sure that `cfg` includes table mappings.
132129
cloneCfg := updateFlowConfigWithLatestSettings(cfg, state)
133130
uploadConfigToCatalog(ctx, cloneCfg)
134131
return cloneCfg
@@ -278,7 +275,7 @@ func processTableAdditions(
278275
alterPublicationAddAdditionalTablesFuture := workflow.ExecuteActivity(
279276
alterPublicationAddAdditionalTablesCtx,
280277
flowable.AddTablesToPublication,
281-
cfg, flowConfigUpdate.AdditionalTables)
278+
internal.MinimizeFlowConfiguration(cfg), flowConfigUpdate.AdditionalTables)
282279

283280
var res *CDCFlowWorkflowResult
284281
var addTablesFlowErr error
@@ -320,6 +317,7 @@ func processTableAdditions(
320317
childAddTablesCDCFlowFuture := workflow.ExecuteChildWorkflow(
321318
childAddTablesCDCFlowCtx,
322319
CDCFlowWorkflow,
320+
// TODO: `additonalTableCfg` this cannot be minimized in the main branch; but the limitation is minimal.
323321
additionalTablesCfg,
324322
nil,
325323
)
@@ -394,7 +392,7 @@ func processTableRemovals(
394392
rawTableCleanupFuture := workflow.ExecuteActivity(
395393
removeTablesCtx,
396394
flowable.RemoveTablesFromRawTable,
397-
cfg, state.FlowConfigUpdate.RemovedTables)
395+
internal.MinimizeFlowConfiguration(cfg), state.FlowConfigUpdate.RemovedTables)
398396
removeTablesSelector.AddFuture(rawTableCleanupFuture, func(f workflow.Future) {
399397
if err := f.Get(ctx, nil); err != nil {
400398
logger.Error("failed to clean up raw table for removed tables", slog.Any("error", err))
@@ -406,7 +404,7 @@ func processTableRemovals(
406404
removeTablesFromCatalogFuture := workflow.ExecuteActivity(
407405
removeTablesCtx,
408406
flowable.RemoveTablesFromCatalog,
409-
cfg, state.FlowConfigUpdate.RemovedTables)
407+
internal.MinimizeFlowConfiguration(cfg), state.FlowConfigUpdate.RemovedTables)
410408
removeTablesSelector.AddFuture(removeTablesFromCatalogFuture, func(f workflow.Future) {
411409
if err := f.Get(ctx, nil); err != nil {
412410
logger.Error("failed to clean up raw table for removed tables", slog.Any("error", err))
@@ -576,7 +574,7 @@ func CDCFlowWorkflow(
576574

577575
logger.Info("mirror resumed", slog.Duration("after", time.Since(startTime)))
578576
state.updateStatus(ctx, logger, protos.FlowStatus_STATUS_RUNNING)
579-
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state)
577+
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, internal.MinimizeFlowConfiguration(cfg), state)
580578
}
581579

582580
originalRunID := workflow.GetInfo(ctx).OriginalRunID
@@ -609,22 +607,6 @@ func CDCFlowWorkflow(
609607
// for safety, rely on the idempotency of SetupFlow instead
610608
// also, no signals are being handled until the loop starts, so no PAUSE/DROP will take here.
611609
if state.CurrentFlowStatus != protos.FlowStatus_STATUS_RUNNING {
612-
originalTableMappings := make([]*protos.TableMapping, 0, len(cfg.TableMappings))
613-
for _, tableMapping := range cfg.TableMappings {
614-
originalTableMappings = append(originalTableMappings, proto.CloneOf(tableMapping))
615-
}
616-
// if resync is true, alter the table name schema mapping to temporarily add
617-
// a suffix to the table names.
618-
if cfg.Resync {
619-
for _, mapping := range state.SyncFlowOptions.TableMappings {
620-
if mapping.Engine != protos.TableEngine_CH_ENGINE_NULL {
621-
mapping.DestinationTableIdentifier += "_resync"
622-
}
623-
}
624-
// because we have renamed the tables.
625-
cfg.TableMappings = state.SyncFlowOptions.TableMappings
626-
}
627-
628610
// start the SetupFlow workflow as a child workflow, and wait for it to complete
629611
// it should return the table schema for the source peer
630612
setupFlowID := GetChildWorkflowID("setup-flow", cfg.FlowJobName, originalRunID)
@@ -648,7 +630,6 @@ func CDCFlowWorkflow(
648630
state.ActiveSignal = model.ResyncSignal
649631
cfg.Resync = true
650632
cfg.DoInitialSnapshot = true
651-
cfg.TableMappings = originalTableMappings
652633
// this is the only place where we can have a resync during a resync
653634
// so we need to NOT sync the tableMappings to catalog to preserve original names
654635
uploadConfigToCatalog(ctx, cfg)
@@ -672,7 +653,10 @@ func CDCFlowWorkflow(
672653
WaitForCancellation: true,
673654
}
674655
setupFlowCtx := workflow.WithChildOptions(ctx, childSetupFlowOpts)
675-
setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg)
656+
// Resync will rely rely on the `cfg.Resync` flag to rename the tables
657+
// during the snapshot process. This is how we're able to also remove the need
658+
// to sync the config back into the DB / not rely on the `state.TableMappings`.
659+
setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, internal.MinimizeFlowConfiguration(cfg))
676660

677661
var setupFlowOutput *protos.SetupFlowOutput
678662
var setupFlowError error
@@ -808,7 +792,7 @@ func CDCFlowWorkflow(
808792
logger.Info("executed setup flow and snapshot flow, start running")
809793
state.updateStatus(ctx, logger, protos.FlowStatus_STATUS_RUNNING)
810794
}
811-
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state)
795+
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, internal.MinimizeFlowConfiguration(cfg), state)
812796
}
813797

814798
var finished bool
@@ -819,7 +803,8 @@ func CDCFlowWorkflow(
819803
WaitForCancellation: true,
820804
RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 1},
821805
}))
822-
syncFlowFuture := workflow.ExecuteActivity(syncCtx, flowable.SyncFlow, cfg, state.SyncFlowOptions)
806+
state.SyncFlowOptions.TableMappings = []*protos.TableMapping{}
807+
syncFlowFuture := workflow.ExecuteActivity(syncCtx, flowable.SyncFlow, internal.MinimizeFlowConfiguration(cfg), state.SyncFlowOptions)
823808

824809
mainLoopSelector := workflow.NewNamedSelector(ctx, "MainLoop")
825810
mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {
@@ -940,7 +925,7 @@ func CDCFlowWorkflow(
940925
if state.ActiveSignal == model.TerminateSignal || state.ActiveSignal == model.ResyncSignal {
941926
return state, workflow.NewContinueAsNewError(ctx, DropFlowWorkflow, state.DropFlowInput)
942927
}
943-
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state)
928+
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, internal.MinimizeFlowConfiguration(cfg), state)
944929
}
945930
}
946931
}

flow/workflows/setup_flow.go

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,17 @@ type SetupFlowExecution struct {
3636
tableNameMapping map[string]string
3737
cdcFlowName string
3838
executionID string
39+
resync bool
3940
}
4041

4142
// NewSetupFlowExecution creates a new instance of SetupFlowExecution.
42-
func NewSetupFlowExecution(ctx workflow.Context, tableNameMapping map[string]string, cdcFlowName string) *SetupFlowExecution {
43+
func NewSetupFlowExecution(ctx workflow.Context, tableNameMapping map[string]string, cdcFlowName string, resync bool) *SetupFlowExecution {
4344
return &SetupFlowExecution{
4445
Logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), cdcFlowName)),
4546
tableNameMapping: tableNameMapping,
4647
cdcFlowName: cdcFlowName,
4748
executionID: workflow.GetInfo(ctx).WorkflowExecution.ID,
49+
resync: resync,
4850
}
4951
}
5052

@@ -119,10 +121,9 @@ func (s *SetupFlowExecution) ensurePullability(
119121

120122
// create EnsurePullabilityInput for the srcTableName
121123
ensurePullabilityInput := &protos.EnsurePullabilityBatchInput{
122-
PeerName: config.SourceName,
123-
FlowJobName: s.cdcFlowName,
124-
SourceTableIdentifiers: slices.Sorted(maps.Keys(s.tableNameMapping)),
125-
CheckConstraints: checkConstraints,
124+
PeerName: config.SourceName,
125+
FlowJobName: s.cdcFlowName,
126+
CheckConstraints: checkConstraints,
126127
}
127128

128129
future := workflow.ExecuteActivity(ctx, flowable.EnsurePullability, ensurePullabilityInput)
@@ -162,9 +163,8 @@ func (s *SetupFlowExecution) createRawTable(
162163

163164
// attempt to create the tables.
164165
createRawTblInput := &protos.CreateRawTableInput{
165-
PeerName: config.DestinationName,
166-
FlowJobName: s.cdcFlowName,
167-
TableNameMapping: s.tableNameMapping,
166+
PeerName: config.DestinationName,
167+
FlowJobName: s.cdcFlowName,
168168
}
169169

170170
rawTblFuture := workflow.ExecuteActivity(ctx, flowable.CreateRawTable, createRawTblInput)
@@ -191,12 +191,11 @@ func (s *SetupFlowExecution) setupNormalizedTables(
191191
})
192192

193193
tableSchemaInput := &protos.SetupTableSchemaBatchInput{
194-
PeerName: flowConnectionConfigs.SourceName,
195-
TableMappings: flowConnectionConfigs.TableMappings,
196-
FlowName: s.cdcFlowName,
197-
System: flowConnectionConfigs.System,
198-
Env: flowConnectionConfigs.Env,
199-
Version: flowConnectionConfigs.Version,
194+
PeerName: flowConnectionConfigs.SourceName,
195+
FlowName: s.cdcFlowName,
196+
System: flowConnectionConfigs.System,
197+
Env: flowConnectionConfigs.Env,
198+
Version: flowConnectionConfigs.Version,
200199
}
201200

202201
if err := workflow.ExecuteActivity(ctx, flowable.SetupTableSchema, tableSchemaInput).Get(ctx, nil); err != nil {
@@ -207,7 +206,6 @@ func (s *SetupFlowExecution) setupNormalizedTables(
207206
s.Info("setting up normalized tables on destination peer", slog.String("destination", flowConnectionConfigs.DestinationName))
208207
setupConfig := &protos.SetupNormalizedTableBatchInput{
209208
PeerName: flowConnectionConfigs.DestinationName,
210-
TableMappings: flowConnectionConfigs.TableMappings,
211209
SoftDeleteColName: flowConnectionConfigs.SoftDeleteColName,
212210
SyncedAtColName: flowConnectionConfigs.SyncedAtColName,
213211
FlowName: flowConnectionConfigs.FlowJobName,
@@ -261,13 +259,8 @@ func (s *SetupFlowExecution) executeSetupFlow(
261259

262260
// SetupFlowWorkflow is the workflow that sets up the flow.
263261
func SetupFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigs) (*protos.SetupFlowOutput, error) {
264-
tblNameMapping := make(map[string]string, len(config.TableMappings))
265-
for _, v := range config.TableMappings {
266-
tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier
267-
}
268-
269262
// create the setup flow execution
270-
setupFlowExecution := NewSetupFlowExecution(ctx, tblNameMapping, config.FlowJobName)
263+
setupFlowExecution := NewSetupFlowExecution(ctx, map[string]string{}, config.FlowJobName, config.Resync)
271264

272265
// execute the setup flow
273266
setupFlowOutput, err := setupFlowExecution.executeSetupFlow(ctx, config)

0 commit comments

Comments
 (0)