Skip to content

Commit 7424ec1

Browse files
lanieheiclaude
andcommitted
Add dry-run mode for XDC system workflows
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 42a0024 commit 7424ec1

File tree

5 files changed

+37
-0
lines changed

5 files changed

+37
-0
lines changed

common/dynamicconfig/constants.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3094,6 +3094,11 @@ WorkerActivitiesPerSecond, MaxConcurrentActivityTaskPollers.
30943094
false,
30953095
`WorkerEnableHistoryRateLimiter decides whether to generate migration tasks with history length rate limiter.`,
30963096
)
3097+
WorkerDryRunMode = NewGlobalBoolSetting(
3098+
"worker.dryRunMode",
3099+
false,
3100+
`WorkerDryRunMode causes XDC system workflows (force-replication, namespace-handover) to complete immediately without doing real work. Use for testing.`,
3101+
)
30973102
MaxUserMetadataSummarySize = NewNamespaceIntSetting(
30983103
"limit.userMetadataSummarySize",
30993104
400,

service/worker/migration/activities.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ type (
114114
namespaceReplicationQueue persistence.NamespaceReplicationQueue
115115
generateMigrationTaskViaFrontend dynamicconfig.BoolPropertyFn
116116
enableHistoryRateLimiter dynamicconfig.BoolPropertyFn
117+
dryRunMode dynamicconfig.BoolPropertyFn
117118
workflowVerifier WorkflowVerifier
118119
chasmRegistry *chasm.Registry
119120
}
@@ -152,6 +153,11 @@ func (r verifyResult) isVerified() bool {
152153
// changing all existing tooling around namespace migration to start workflows & activities on the new task queue.
153154
// Another approach is to use separate workers for workflow tasks and activities and keep existing tooling unchanged.
154155

156+
// IsDryRunMode returns whether the cell is in dry-run mode.
157+
func (a *activities) IsDryRunMode(_ context.Context) (bool, error) {
158+
return a.dryRunMode(), nil
159+
}
160+
155161
// GetMetadata returns history shard count and namespaceID for requested namespace.
156162
func (a *activities) GetMetadata(_ context.Context, request metadataRequest) (*metadataResponse, error) {
157163
nsEntry, err := a.namespaceRegistry.GetNamespace(namespace.Name(request.Namespace))

service/worker/migration/force_replication_workflow.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,10 @@ func ForceReplicationWorkflow(ctx workflow.Context, params ForceReplicationParam
134134
}, nil
135135
})
136136

137+
if dryRun, _ := isDryRunMode(ctx); dryRun {
138+
return nil
139+
}
140+
137141
if err := validateAndSetForceReplicationParams(ctx, &params); err != nil {
138142
return err
139143
}
@@ -217,6 +221,10 @@ func ForceReplicationWorkflowV2(ctx workflow.Context, params ForceReplicationPar
217221
}, nil
218222
})
219223

224+
if dryRun, _ := isDryRunMode(ctx); dryRun {
225+
return nil
226+
}
227+
220228
if err := validateAndSetForceReplicationParams(ctx, &params); err != nil {
221229
return err
222230
}
@@ -337,6 +345,18 @@ func ForceTaskQueueUserDataReplicationWorkflow(ctx workflow.Context, params Task
337345
return err
338346
}
339347

348+
func isDryRunMode(ctx workflow.Context) (bool, error) {
349+
lao := workflow.LocalActivityOptions{
350+
StartToCloseTimeout: 5 * time.Second,
351+
}
352+
var a *activities
353+
var dryRun bool
354+
err := workflow.ExecuteLocalActivity(
355+
workflow.WithLocalActivityOptions(ctx, lao), a.IsDryRunMode,
356+
).Get(ctx, &dryRun)
357+
return dryRun, err
358+
}
359+
340360
func validateAndSetForceReplicationParams(ctx workflow.Context, params *ForceReplicationParams) error {
341361
if len(params.Namespace) == 0 {
342362
return temporal.NewNonRetryableApplicationError("InvalidArgument: Namespace is required", "InvalidArgument", nil)

service/worker/migration/fx.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ func (wc *replicationWorkerComponent) activities() *activities {
124124
forceReplicationMetricsHandler: wc.MetricsHandler.WithTags(metrics.WorkflowTypeTag(forceReplicationWorkflowName)),
125125
generateMigrationTaskViaFrontend: dynamicconfig.WorkerGenerateMigrationTaskViaFrontend.Get(wc.DynamicCollection),
126126
enableHistoryRateLimiter: dynamicconfig.WorkerEnableHistoryRateLimiter.Get(wc.DynamicCollection),
127+
dryRunMode: dynamicconfig.WorkerDryRunMode.Get(wc.DynamicCollection),
127128
workflowVerifier: wc.WorkflowVerifier,
128129
chasmRegistry: wc.ChasmRegistry,
129130
}

service/worker/migration/handover_workflow.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@ func NamespaceHandoverWorkflow(ctx workflow.Context, params NamespaceHandoverPar
7575
if err := validateAndSetNamespaceHandoverParams(&params); err != nil {
7676
return err
7777
}
78+
79+
if dryRun, _ := isDryRunMode(ctx); dryRun {
80+
return nil
81+
}
82+
7883
retryPolicy := &temporal.RetryPolicy{
7984
InitialInterval: time.Second,
8085
MaximumInterval: time.Second,

0 commit comments

Comments
 (0)