Skip to content

Commit ca1c463

Browse files
authored
Ability to generate migration task via frontend (#8078)
## What changed? - Add capability to generate migration task via frontend. ## Why? - We don't really have per namespace rate limiting on history client, going through frontend allow us to rate limiting migration traffic per namespace and protect the cluster. ## How did you test it? - [x] built - [ ] run locally and tested manually - [ ] covered by existing tests - [x] added new unit test(s) - [ ] added new functional test(s) ## Potential risks - Feature is behind a feature flag, so should be safe. - The feature flag can only be enabled after the change in the admin handler (for forwarding the target cluster field) is deployed.
1 parent 2237b4f commit ca1c463

File tree

7 files changed

+166
-74
lines changed

7 files changed

+166
-74
lines changed

api/adminservice/v1/request_response.pb.go

Lines changed: 16 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/dynamicconfig/constants.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2703,6 +2703,11 @@ Valid fields: MaxConcurrentActivityExecutionSize, TaskQueueActivitiesPerSecond,
27032703
WorkerActivitiesPerSecond, MaxConcurrentActivityTaskPollers.
27042704
`,
27052705
)
2706+
WorkerGenerateMigrationTaskViaFrontend = NewGlobalBoolSetting(
2707+
"worker.generateMigrationTaskViaFrontend",
2708+
false,
2709+
`WorkerGenerateMigrationTaskViaFrontend controls whether to generate migration tasks via frontend admin service.`,
2710+
)
27062711
MaxUserMetadataSummarySize = NewNamespaceIntSetting(
27072712
"limit.userMetadataSummarySize",
27082713
400,

proto/internal/temporal/server/api/adminservice/v1/request_response.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,7 @@ message SyncWorkflowStateResponse {
563563
message GenerateLastHistoryReplicationTasksRequest {
564564
string namespace = 1;
565565
temporal.api.common.v1.WorkflowExecution execution = 2;
566+
repeated string target_clusters = 3;
566567
}
567568

568569
message GenerateLastHistoryReplicationTasksResponse {

service/frontend/admin_handler.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2151,8 +2151,9 @@ func (adh *AdminHandler) GenerateLastHistoryReplicationTasks(
21512151
resp, err := adh.historyClient.GenerateLastHistoryReplicationTasks(
21522152
ctx,
21532153
&historyservice.GenerateLastHistoryReplicationTasksRequest{
2154-
NamespaceId: namespaceEntry.ID().String(),
2155-
Execution: request.Execution,
2154+
NamespaceId: namespaceEntry.ID().String(),
2155+
Execution: request.Execution,
2156+
TargetClusters: request.TargetClusters,
21562157
},
21572158
)
21582159
if err != nil {

service/worker/migration/activities.go

Lines changed: 78 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@ import (
1515
"go.temporal.io/api/workflowservice/v1"
1616
"go.temporal.io/sdk/activity"
1717
"go.temporal.io/sdk/temporal"
18+
"go.temporal.io/server/api/adminservice/v1"
1819
enumsspb "go.temporal.io/server/api/enums/v1"
1920
"go.temporal.io/server/api/historyservice/v1"
2021
replicationspb "go.temporal.io/server/api/replication/v1"
2122
serverClient "go.temporal.io/server/client"
2223
"go.temporal.io/server/common/definition"
24+
"go.temporal.io/server/common/dynamicconfig"
2325
"go.temporal.io/server/common/headers"
2426
"go.temporal.io/server/common/log"
2527
"go.temporal.io/server/common/log/tag"
@@ -100,18 +102,20 @@ type (
100102
}
101103

102104
activities struct {
103-
historyShardCount int32
104-
executionManager persistence.ExecutionManager
105-
taskManager persistence.TaskManager
106-
namespaceRegistry namespace.Registry
107-
historyClient historyservice.HistoryServiceClient
108-
frontendClient workflowservice.WorkflowServiceClient
109-
clientFactory serverClient.Factory
110-
clientBean serverClient.Bean
111-
logger log.Logger
112-
metricsHandler metrics.Handler
113-
forceReplicationMetricsHandler metrics.Handler
114-
namespaceReplicationQueue persistence.NamespaceReplicationQueue
105+
historyShardCount int32
106+
executionManager persistence.ExecutionManager
107+
taskManager persistence.TaskManager
108+
namespaceRegistry namespace.Registry
109+
historyClient historyservice.HistoryServiceClient
110+
frontendClient workflowservice.WorkflowServiceClient
111+
adminClient adminservice.AdminServiceClient
112+
clientFactory serverClient.Factory
113+
clientBean serverClient.Bean
114+
logger log.Logger
115+
metricsHandler metrics.Handler
116+
forceReplicationMetricsHandler metrics.Handler
117+
namespaceReplicationQueue persistence.NamespaceReplicationQueue
118+
generateMigrationTaskViaFrontend dynamicconfig.BoolPropertyFn
115119
}
116120
)
117121

@@ -336,7 +340,15 @@ func (a *activities) checkHandoverOnce(ctx context.Context, waitRequest waitHand
336340
return readyShardCount == len(resp.Shards), nil
337341
}
338342

339-
func (a *activities) generateWorkflowReplicationTask(ctx context.Context, rateLimiter quotas.RateLimiter, wKey definition.WorkflowKey, targetClusters []string) error {
343+
func (a *activities) generateWorkflowReplicationTask(
344+
ctx context.Context,
345+
rateLimiter quotas.RateLimiter,
346+
namespaceName string,
347+
namespaceID string,
348+
we *commonpb.WorkflowExecution,
349+
targetClusters []string,
350+
generateViaFrontend bool,
351+
) error {
340352
if err := rateLimiter.WaitN(ctx, 1); err != nil {
341353
return err
342354
}
@@ -345,24 +357,36 @@ func (a *activities) generateWorkflowReplicationTask(ctx context.Context, rateLi
345357
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
346358
defer cancel()
347359

348-
resp, err := a.historyClient.GenerateLastHistoryReplicationTasks(ctx, &historyservice.GenerateLastHistoryReplicationTasksRequest{
349-
NamespaceId: wKey.NamespaceID,
350-
Execution: &commonpb.WorkflowExecution{
351-
WorkflowId: wKey.WorkflowID,
352-
RunId: wKey.RunID,
353-
},
354-
TargetClusters: targetClusters,
355-
})
356-
357-
if err != nil {
358-
return err
360+
var stateTransitionCount, historyLength int64
361+
if generateViaFrontend {
362+
resp, err := a.adminClient.GenerateLastHistoryReplicationTasks(ctx, &adminservice.GenerateLastHistoryReplicationTasksRequest{
363+
Namespace: namespaceName,
364+
Execution: we,
365+
TargetClusters: targetClusters,
366+
})
367+
if err != nil {
368+
return err
369+
}
370+
stateTransitionCount = resp.StateTransitionCount
371+
historyLength = resp.HistoryLength
372+
} else {
373+
resp, err := a.historyClient.GenerateLastHistoryReplicationTasks(ctx, &historyservice.GenerateLastHistoryReplicationTasksRequest{
374+
NamespaceId: namespaceID,
375+
Execution: we,
376+
TargetClusters: targetClusters,
377+
})
378+
if err != nil {
379+
return err
380+
}
381+
stateTransitionCount = resp.StateTransitionCount
382+
historyLength = resp.HistoryLength
359383
}
360384

361385
// If workflow has many activity retries (bug in activity code e.g.,), the state transition count can be
362386
// large but the number of actual state transition that is applied on target cluster can be very small.
363387
// Take the minimum between StateTransitionCount and HistoryLength as heuristic to avoid unnecessary throttling
364388
// in such situation.
365-
count := min(resp.StateTransitionCount, resp.HistoryLength)
389+
count := min(stateTransitionCount, historyLength)
366390
for count > 0 {
367391
token := min(int(count), rateLimiter.Burst())
368392
count -= int64(token)
@@ -475,27 +499,38 @@ func (a *activities) GenerateReplicationTasks(ctx context.Context, request *gene
475499
}
476500
}
477501

502+
namespaceName, err := a.namespaceRegistry.GetNamespaceName(namespace.ID(request.NamespaceID))
503+
if err != nil {
504+
a.logger.Error("force-replication failed to translate namespaceID to name", tag.WorkflowNamespaceID(request.NamespaceID))
505+
return err
506+
}
507+
508+
generateViaFrontend := a.generateMigrationTaskViaFrontend()
478509
for i := startIndex; i < len(request.Executions); i++ {
479-
var executionCandidates []definition.WorkflowKey
480-
executionCandidates = []definition.WorkflowKey{definition.NewWorkflowKey(request.NamespaceID, request.Executions[i].GetWorkflowId(), request.Executions[i].GetRunId())}
481-
482-
for _, we := range executionCandidates {
483-
if err := a.generateWorkflowReplicationTask(ctx, rateLimiter, we, request.TargetClusters); err != nil {
484-
if !isNotFoundServiceError(err) {
485-
a.logger.Error("force-replication failed to generate replication task",
486-
tag.WorkflowNamespaceID(we.GetNamespaceID()),
487-
tag.WorkflowID(we.GetWorkflowID()),
488-
tag.WorkflowRunID(we.GetRunID()),
489-
tag.Error(err))
490-
return err
491-
}
492-
493-
a.logger.Warn("force-replication ignore replication task due to NotFoundServiceError",
494-
tag.WorkflowNamespaceID(we.GetNamespaceID()),
495-
tag.WorkflowID(we.GetWorkflowID()),
496-
tag.WorkflowRunID(we.GetRunID()),
510+
we := request.Executions[i]
511+
if err := a.generateWorkflowReplicationTask(
512+
ctx,
513+
rateLimiter,
514+
namespaceName.String(),
515+
request.NamespaceID,
516+
we,
517+
request.TargetClusters,
518+
generateViaFrontend,
519+
); err != nil {
520+
if !isNotFoundServiceError(err) {
521+
a.logger.Error("force-replication failed to generate replication task",
522+
tag.WorkflowNamespaceID(request.NamespaceID),
523+
tag.WorkflowID(we.GetWorkflowId()),
524+
tag.WorkflowRunID(we.GetRunId()),
497525
tag.Error(err))
526+
return err
498527
}
528+
529+
a.logger.Warn("force-replication ignore replication task due to NotFoundServiceError",
530+
tag.WorkflowNamespaceID(request.NamespaceID),
531+
tag.WorkflowID(we.GetWorkflowId()),
532+
tag.WorkflowRunID(we.GetRunId()),
533+
tag.Error(err))
499534
}
500535
activity.RecordHeartbeat(ctx, i)
501536
}

service/worker/migration/activities_test.go

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,14 @@ import (
1313
"go.temporal.io/sdk/interceptor"
1414
"go.temporal.io/sdk/testsuite"
1515
"go.temporal.io/sdk/worker"
16+
"go.temporal.io/server/api/adminservice/v1"
17+
"go.temporal.io/server/api/adminservicemock/v1"
1618
enumsspb "go.temporal.io/server/api/enums/v1"
1719
"go.temporal.io/server/api/historyservice/v1"
1820
"go.temporal.io/server/api/historyservicemock/v1"
1921
persistencespb "go.temporal.io/server/api/persistence/v1"
2022
"go.temporal.io/server/client"
23+
"go.temporal.io/server/common/dynamicconfig"
2124
"go.temporal.io/server/common/log"
2225
"go.temporal.io/server/common/metrics"
2326
"go.temporal.io/server/common/namespace"
@@ -43,6 +46,7 @@ type activitiesSuite struct {
4346
mockClientBean *client.MockBean
4447

4548
mockFrontendClient *workflowservicemock.MockWorkflowServiceClient
49+
mockAdminClient *adminservicemock.MockAdminServiceClient
4650
mockHistoryClient *historyservicemock.MockHistoryServiceClient
4751
mockRemoteClient *workflowservicemock.MockWorkflowServiceClient
4852

@@ -101,6 +105,7 @@ func (s *activitiesSuite) SetupTest() {
101105
s.mockClientBean = client.NewMockBean(s.controller)
102106

103107
s.mockFrontendClient = workflowservicemock.NewMockWorkflowServiceClient(s.controller)
108+
s.mockAdminClient = adminservicemock.NewMockAdminServiceClient(s.controller)
104109
s.mockHistoryClient = historyservicemock.NewMockHistoryServiceClient(s.controller)
105110
s.mockRemoteClient = workflowservicemock.NewMockWorkflowServiceClient(s.controller)
106111

@@ -116,16 +121,18 @@ func (s *activitiesSuite) SetupTest() {
116121
Return(&testNamespace, nil).AnyTimes()
117122

118123
s.a = &activities{
119-
namespaceRegistry: s.mockNamespaceRegistry,
120-
namespaceReplicationQueue: s.mockNamespaceReplicationQueue,
121-
clientFactory: s.mockClientFactory,
122-
clientBean: s.mockClientBean,
123-
taskManager: s.mockTaskManager,
124-
frontendClient: s.mockFrontendClient,
125-
historyClient: s.mockHistoryClient,
126-
logger: log.NewCLILogger(),
127-
metricsHandler: s.mockMetricsHandler,
128-
forceReplicationMetricsHandler: s.mockMetricsHandler,
124+
namespaceRegistry: s.mockNamespaceRegistry,
125+
namespaceReplicationQueue: s.mockNamespaceReplicationQueue,
126+
clientFactory: s.mockClientFactory,
127+
clientBean: s.mockClientBean,
128+
taskManager: s.mockTaskManager,
129+
frontendClient: s.mockFrontendClient,
130+
adminClient: s.mockAdminClient,
131+
historyClient: s.mockHistoryClient,
132+
logger: log.NewCLILogger(),
133+
metricsHandler: s.mockMetricsHandler,
134+
forceReplicationMetricsHandler: s.mockMetricsHandler,
135+
generateMigrationTaskViaFrontend: dynamicconfig.GetBoolPropertyFn(false),
129136
}
130137
}
131138

@@ -651,6 +658,37 @@ func (s *activitiesSuite) TestGenerateReplicationTasks_Failed() {
651658
s.Equal(0, lastHeartBeat)
652659
}
653660

661+
func (s *activitiesSuite) TestGenerateReplicationTasks_Success_ViaFrontend() {
662+
env, iceptor := s.initEnv()
663+
s.a.generateMigrationTaskViaFrontend = dynamicconfig.GetBoolPropertyFn(true)
664+
665+
request := generateReplicationTasksRequest{
666+
NamespaceID: mockedNamespaceID,
667+
RPS: 10,
668+
GetParentInfoRPS: 10,
669+
Executions: []*commonpb.WorkflowExecution{execution1, execution2},
670+
TargetClusters: []string{remoteCluster},
671+
}
672+
673+
// Test startIndex logic, and it should be 1 when running the activity.
674+
env.SetHeartbeatDetails(0)
675+
676+
we := request.Executions[1]
677+
s.mockAdminClient.EXPECT().GenerateLastHistoryReplicationTasks(gomock.Any(), protomock.Eq(&adminservice.GenerateLastHistoryReplicationTasksRequest{
678+
Namespace: mockedNamespace,
679+
Execution: we,
680+
TargetClusters: []string{remoteCluster},
681+
})).Return(&adminservice.GenerateLastHistoryReplicationTasksResponse{}, nil).Times(1)
682+
683+
_, err := env.ExecuteActivity(s.a.GenerateReplicationTasks, &request)
684+
s.NoError(err)
685+
686+
s.Len(iceptor.generateReplicationRecordedHeartbeats, 1)
687+
lastIdx := len(iceptor.generateReplicationRecordedHeartbeats) - 1
688+
lastHeartBeat := iceptor.generateReplicationRecordedHeartbeats[lastIdx]
689+
s.Equal(1, lastHeartBeat)
690+
}
691+
654692
func (s *activitiesSuite) TestCountWorkflows() {
655693
env, _ := s.initEnv()
656694

service/worker/migration/fx.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"go.temporal.io/sdk/workflow"
99
serverClient "go.temporal.io/server/client"
1010
"go.temporal.io/server/common/config"
11+
"go.temporal.io/server/common/dynamicconfig"
1112
"go.temporal.io/server/common/headers"
1213
"go.temporal.io/server/common/log"
1314
"go.temporal.io/server/common/metrics"
@@ -33,6 +34,7 @@ type (
3334
TaskManager persistence.TaskManager
3435
Logger log.Logger
3536
MetricsHandler metrics.Handler
37+
DynamicCollection *dynamicconfig.Collection
3638
}
3739

3840
fxResult struct {
@@ -85,17 +87,18 @@ func (wc *replicationWorkerComponent) DedicatedActivityWorkerOptions() *workerco
8587

8688
func (wc *replicationWorkerComponent) activities() *activities {
8789
return &activities{
88-
historyShardCount: wc.PersistenceConfig.NumHistoryShards,
89-
executionManager: wc.ExecutionManager,
90-
namespaceRegistry: wc.NamespaceRegistry,
91-
historyClient: wc.HistoryClient,
92-
frontendClient: wc.FrontendClient,
93-
clientFactory: wc.ClientFactory,
94-
clientBean: wc.ClientBean,
95-
namespaceReplicationQueue: wc.NamespaceReplicationQueue,
96-
taskManager: wc.TaskManager,
97-
logger: wc.Logger,
98-
metricsHandler: wc.MetricsHandler,
99-
forceReplicationMetricsHandler: wc.MetricsHandler.WithTags(metrics.WorkflowTypeTag(forceReplicationWorkflowName)),
90+
historyShardCount: wc.PersistenceConfig.NumHistoryShards,
91+
executionManager: wc.ExecutionManager,
92+
namespaceRegistry: wc.NamespaceRegistry,
93+
historyClient: wc.HistoryClient,
94+
frontendClient: wc.FrontendClient,
95+
clientFactory: wc.ClientFactory,
96+
clientBean: wc.ClientBean,
97+
namespaceReplicationQueue: wc.NamespaceReplicationQueue,
98+
taskManager: wc.TaskManager,
99+
logger: wc.Logger,
100+
metricsHandler: wc.MetricsHandler,
101+
forceReplicationMetricsHandler: wc.MetricsHandler.WithTags(metrics.WorkflowTypeTag(forceReplicationWorkflowName)),
102+
generateMigrationTaskViaFrontend: dynamicconfig.WorkerGenerateMigrationTaskViaFrontend.Get(wc.DynamicCollection),
100103
}
101104
}

0 commit comments

Comments
 (0)