Skip to content

Commit ba6e4ea

Browse files
ilidemiserprex
andauthored
Replace local activities with plain activities, except status in catalog (#3350)
Like #3301 but trying to take the status in catalog separately > this small optimization increases complexity around reexecution > moving towards keeping our use of temporal api surface minimal > if it's found that a workflow is spending too much time dealing with overhead of activities, > they should be combined into one long running activity --------- Co-authored-by: Philip Dubé <[email protected]>
1 parent 4efe6d9 commit ba6e4ea

File tree

8 files changed

+66
-79
lines changed

8 files changed

+66
-79
lines changed

flow/activities/flowable.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1412,3 +1412,11 @@ func (a *FlowableActivity) GetFlowMetadata(
14121412
IsResync: input.IsResync,
14131413
}, nil
14141414
}
1415+
1416+
func (a *FlowableActivity) UpdateCDCConfigInCatalogActivity(ctx context.Context, cfg *protos.FlowConnectionConfigs) error {
1417+
return internal.UpdateCDCConfigInCatalog(ctx, a.CatalogPool, internal.LoggerFromCtx(ctx), cfg)
1418+
}
1419+
1420+
func (a *FlowableActivity) PeerDBFullRefreshOverwriteMode(ctx context.Context, env map[string]string) (bool, error) {
1421+
return internal.PeerDBFullRefreshOverwriteMode(ctx, env)
1422+
}

flow/activities/snapshot_activity.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,10 @@ func (a *SnapshotActivity) LoadTableSchema(
167167
return internal.LoadTableSchemaFromCatalog(ctx, a.CatalogPool, flowName, tableName)
168168
}
169169

170+
func (a *SnapshotActivity) GetPeerType(ctx context.Context, name string) (protos.DBType, error) {
171+
return connectors.LoadPeerType(ctx, a.CatalogPool, name)
172+
}
173+
170174
func (a *SnapshotActivity) GetDefaultPartitionKeyForTables(
171175
ctx context.Context,
172176
input *protos.FlowConnectionConfigs,

flow/e2e/clickhouse/peer_flow_ch_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,9 @@ func (s ClickHouseSuite) Test_Addition_Removal() {
147147
e2e.EnvWaitFor(s.t, env, 4*time.Minute, "adding table", func() bool {
148148
return env.GetFlowStatus(s.t) == protos.FlowStatus_STATUS_RUNNING
149149
})
150-
afterAddRunID := e2e.EnvGetRunID(s.t, env)
151-
require.NotEqual(s.t, runID, afterAddRunID)
150+
e2e.EnvWaitFor(s.t, env, time.Minute, "ContinueAsNew", func() bool {
151+
return runID != e2e.EnvGetRunID(s.t, env)
152+
})
152153

153154
require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(`INSERT INTO %s ("key") VALUES ('test')`, addedSrcTableName)))
154155
e2e.EnvWaitForEqualTablesWithNames(env, s, "first insert to added table", "test_table_add_remove_added", addedDstTableName, "id,\"key\"")
@@ -497,11 +498,10 @@ func (s ClickHouseSuite) WeirdTable(tableName string) {
497498
_, err = s.Conn().Exec(s.t.Context(),
498499
fmt.Sprintf("INSERT INTO %s (key, \"includedColumn?\", \"excludedColumn?\") VALUES ('cdc','still','ex')", srcFullName))
499500
require.NoError(s.t, err)
500-
501501
e2e.EnvWaitForEqualTablesWithNames(env, s, "waiting on cdc", srcTableName, dstTableName, "id,\"key\"")
502-
503502
env.Cancel(s.t.Context())
504503
e2e.RequireEnvCanceled(s.t, env)
504+
505505
env = e2e.ExecuteWorkflow(s.t.Context(), tc, shared.PeerFlowTaskQueue, peerflow.DropFlowWorkflow, &protos.DropFlowInput{
506506
FlowJobName: flowConnConfig.FlowJobName,
507507
DropFlowStats: false,

flow/workflows/cdc_flow.go

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ func syncStatusToCatalog(ctx workflow.Context, logger log.Logger, status protos.
7272
StartToCloseTimeout: 1 * time.Minute,
7373
})
7474

75-
updateFuture := workflow.ExecuteLocalActivity(updateCtx, updateFlowStatusInCatalogActivity,
76-
workflow.GetInfo(ctx).WorkflowExecution.ID, status)
75+
updateFuture := workflow.ExecuteLocalActivity(updateCtx,
76+
updateFlowStatusInCatalogActivity, workflow.GetInfo(ctx).WorkflowExecution.ID, status)
7777
if err := updateFuture.Get(updateCtx, nil); err != nil {
7878
logger.Error("Failed to update flow status in catalog", slog.Any("error", err), slog.String("flowStatus", status.String()))
7979
}
@@ -130,14 +130,13 @@ func uploadConfigToCatalog(
130130
ctx workflow.Context,
131131
cfg *protos.FlowConnectionConfigs,
132132
) {
133-
updateCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
133+
updateCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
134134
StartToCloseTimeout: 5 * time.Minute,
135135
})
136136

137-
logger := workflow.GetLogger(ctx)
138-
updateFuture := workflow.ExecuteLocalActivity(updateCtx, updateCDCConfigInCatalogActivity, logger, cfg)
137+
updateFuture := workflow.ExecuteActivity(updateCtx, flowable.UpdateCDCConfigInCatalogActivity, cfg)
139138
if err := updateFuture.Get(updateCtx, nil); err != nil {
140-
logger.Warn("Failed to update CDC config in catalog", slog.Any("error", err))
139+
workflow.GetLogger(ctx).Warn("Failed to update CDC config in catalog", slog.Any("error", err))
141140
}
142141
}
143142

@@ -178,24 +177,21 @@ func processCDCFlowConfigUpdate(
178177

179178
tablesAreAdded := len(flowConfigUpdate.AdditionalTables) > 0
180179
tablesAreRemoved := len(flowConfigUpdate.RemovedTables) > 0
181-
if !tablesAreAdded && !tablesAreRemoved {
182-
syncStateToConfigProtoInCatalog(ctx, cfg, state)
183-
return nil
184-
}
185-
186-
logger.Info("processing CDCFlowConfigUpdate", slog.Any("updatedState", flowConfigUpdate))
180+
if tablesAreAdded || tablesAreRemoved {
181+
logger.Info("processing CDCFlowConfigUpdate", slog.Any("updatedState", flowConfigUpdate))
187182

188-
if tablesAreAdded {
189-
if err := processTableAdditions(ctx, logger, cfg, state, mirrorNameSearch); err != nil {
190-
logger.Error("failed to process additional tables", slog.Any("error", err))
191-
return err
183+
if tablesAreAdded {
184+
if err := processTableAdditions(ctx, logger, cfg, state, mirrorNameSearch); err != nil {
185+
logger.Error("failed to process additional tables", slog.Any("error", err))
186+
return err
187+
}
192188
}
193-
}
194189

195-
if tablesAreRemoved {
196-
if err := processTableRemovals(ctx, logger, cfg, state); err != nil {
197-
logger.Error("failed to process removed tables", slog.Any("error", err))
198-
return err
190+
if tablesAreRemoved {
191+
if err := processTableRemovals(ctx, logger, cfg, state); err != nil {
192+
logger.Error("failed to process removed tables", slog.Any("error", err))
193+
return err
194+
}
199195
}
200196
}
201197

@@ -560,7 +556,7 @@ func CDCFlowWorkflow(
560556
}
561557
}
562558

563-
logger.Info(fmt.Sprintf("mirror has been resumed after %s", time.Since(startTime).Round(time.Second)))
559+
logger.Info("mirror resumed", slog.Duration("after", time.Since(startTime)))
564560
state.updateStatus(ctx, logger, protos.FlowStatus_STATUS_RUNNING)
565561
return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflow, cfg, state)
566562
}

flow/workflows/local_activities.go

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -3,58 +3,11 @@ package peerflow
33
import (
44
"context"
55
"fmt"
6-
"log/slog"
7-
"time"
86

9-
"go.temporal.io/sdk/log"
10-
"go.temporal.io/sdk/workflow"
11-
12-
"github.com/PeerDB-io/peerdb/flow/connectors"
137
"github.com/PeerDB-io/peerdb/flow/generated/protos"
148
"github.com/PeerDB-io/peerdb/flow/internal"
159
)
1610

17-
func getQRepOverwriteFullRefreshMode(wCtx workflow.Context, logger log.Logger, env map[string]string) bool {
18-
checkCtx := workflow.WithLocalActivityOptions(wCtx, workflow.LocalActivityOptions{
19-
StartToCloseTimeout: time.Minute,
20-
})
21-
22-
getFullRefreshFuture := workflow.ExecuteLocalActivity(checkCtx, internal.PeerDBFullRefreshOverwriteMode, env)
23-
var fullRefreshEnabled bool
24-
if err := getFullRefreshFuture.Get(checkCtx, &fullRefreshEnabled); err != nil {
25-
logger.Warn("Failed to check if full refresh mode is enabled", slog.Any("error", err))
26-
return false
27-
}
28-
return fullRefreshEnabled
29-
}
30-
31-
func localPeerType(ctx context.Context, name string) (protos.DBType, error) {
32-
pool, err := internal.GetCatalogConnectionPoolFromEnv(ctx)
33-
if err != nil {
34-
return 0, err
35-
}
36-
return connectors.LoadPeerType(ctx, pool, name)
37-
}
38-
39-
func getPeerType(wCtx workflow.Context, name string) (protos.DBType, error) {
40-
checkCtx := workflow.WithLocalActivityOptions(wCtx, workflow.LocalActivityOptions{
41-
StartToCloseTimeout: time.Minute,
42-
})
43-
44-
getFuture := workflow.ExecuteLocalActivity(checkCtx, localPeerType, name)
45-
var dbtype protos.DBType
46-
err := getFuture.Get(checkCtx, &dbtype)
47-
return dbtype, err
48-
}
49-
50-
func updateCDCConfigInCatalogActivity(ctx context.Context, logger log.Logger, cfg *protos.FlowConnectionConfigs) error {
51-
pool, err := internal.GetCatalogConnectionPoolFromEnv(ctx)
52-
if err != nil {
53-
return fmt.Errorf("failed to get catalog connection pool: %w", err)
54-
}
55-
return internal.UpdateCDCConfigInCatalog(ctx, pool, logger, cfg)
56-
}
57-
5811
func updateFlowStatusInCatalogActivity(
5912
ctx context.Context,
6013
workflowID string,

flow/workflows/setup_flow.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,12 @@ func (s *SetupFlowExecution) checkConnectionsAndSetupMetadataTables(
5656
) error {
5757
s.Info("checking connections for CDC flow")
5858

59-
checkCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
59+
checkCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
6060
StartToCloseTimeout: time.Minute,
6161
})
6262

6363
// first check the source peer connection
64-
srcConnStatusFuture := workflow.ExecuteLocalActivity(checkCtx, flowable.CheckConnection, &protos.SetupInput{
64+
srcConnStatusFuture := workflow.ExecuteActivity(checkCtx, flowable.CheckConnection, &protos.SetupInput{
6565
Env: config.Env,
6666
PeerName: config.SourceName,
6767
FlowName: config.FlowJobName,
@@ -71,7 +71,7 @@ func (s *SetupFlowExecution) checkConnectionsAndSetupMetadataTables(
7171
PeerName: config.DestinationName,
7272
FlowName: config.FlowJobName,
7373
}
74-
destConnStatusFuture := workflow.ExecuteLocalActivity(checkCtx, flowable.CheckMetadataTables, dstSetupInput)
74+
destConnStatusFuture := workflow.ExecuteActivity(checkCtx, flowable.CheckMetadataTables, dstSetupInput)
7575
if err := srcConnStatusFuture.Get(checkCtx, nil); err != nil {
7676
return fmt.Errorf("failed to check source peer connection: %w", err)
7777
}

flow/workflows/snapshot_flow.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,16 @@ type SnapshotFlowExecution struct {
3131
logger log.Logger
3232
}
3333

34+
func getPeerType(wCtx workflow.Context, name string) (protos.DBType, error) {
35+
checkCtx := workflow.WithActivityOptions(wCtx, workflow.ActivityOptions{
36+
StartToCloseTimeout: time.Minute,
37+
})
38+
39+
var dbtype protos.DBType
40+
err := workflow.ExecuteActivity(checkCtx, snapshot.GetPeerType, name).Get(checkCtx, &dbtype)
41+
return dbtype, err
42+
}
43+
3444
func (s *SnapshotFlowExecution) setupReplication(
3545
ctx workflow.Context,
3646
) (*protos.SetupReplicationOutput, error) {

flow/workflows/util.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package peerflow
22

33
import (
4+
"log/slog"
45
"time"
56

7+
"go.temporal.io/sdk/log"
68
"go.temporal.io/sdk/temporal"
79
"go.temporal.io/sdk/workflow"
810

@@ -26,14 +28,14 @@ func GetFlowMetadataContext(
2628
ctx workflow.Context,
2729
input *protos.FlowContextMetadataInput,
2830
) (workflow.Context, error) {
29-
metadataCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
31+
metadataCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
3032
StartToCloseTimeout: time.Minute,
3133
RetryPolicy: &temporal.RetryPolicy{MaximumInterval: time.Minute},
3234
})
33-
getMetadataFuture := workflow.ExecuteLocalActivity(metadataCtx, flowable.GetFlowMetadata, input)
35+
getMetadataFuture := workflow.ExecuteActivity(metadataCtx, flowable.GetFlowMetadata, input)
3436
var metadata *protos.FlowContextMetadata
3537
if err := getMetadataFuture.Get(metadataCtx, &metadata); err != nil {
36-
return nil, err
38+
return ctx, err
3739
}
3840
return workflow.WithValue(ctx, internal.FlowMetadataKey, metadata), nil
3941
}
@@ -43,3 +45,17 @@ func ShouldWorkflowContinueAsNew(ctx workflow.Context) bool {
4345
return info.GetContinueAsNewSuggested() &&
4446
(info.GetCurrentHistoryLength() > 40960 || info.GetCurrentHistorySize() > 40*1024*1024)
4547
}
48+
49+
func getQRepOverwriteFullRefreshMode(wCtx workflow.Context, logger log.Logger, env map[string]string) bool {
50+
checkCtx := workflow.WithActivityOptions(wCtx, workflow.ActivityOptions{
51+
StartToCloseTimeout: time.Minute,
52+
})
53+
54+
var fullRefreshEnabled bool
55+
getFullRefreshFuture := workflow.ExecuteActivity(checkCtx, flowable.PeerDBFullRefreshOverwriteMode, env)
56+
if err := getFullRefreshFuture.Get(checkCtx, &fullRefreshEnabled); err != nil {
57+
logger.Warn("Failed to check if full refresh mode is enabled", slog.Any("error", err))
58+
return false
59+
}
60+
return fullRefreshEnabled
61+
}

0 commit comments

Comments
 (0)