Skip to content

Commit 6f65aef

Browse files
authored
refactor: Require ActiveClusterName for active-active domains (#7304)
<!-- Describe what has changed in this PR --> **What changed?** Requires ActiveClusterName to be set for active-active domains <!-- Tell your future self why have you made these changes --> **Why?** Active-Active domains will use the ActiveClusterName as the "default" ActiveCluster for any workflows that do not specify a valid `scope:name` ClusterAttribute. This makes migration simple (existing domains can just start adding ClusterAttributes to divide work). <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** Unit tests + simulations. Canary to come. <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** The existing Canary or Simulation tests may start to fail. I plan on validating that they do not prior to merging. <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** Active-Active domains now require an ActiveClusterName. <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes** N/A
1 parent 1f49d66 commit 6f65aef

File tree

8 files changed

+123
-26
lines changed

8 files changed

+123
-26
lines changed

common/cluster/metadata.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,9 @@ func NewMetadata(
113113
return m
114114
}
115115

116-
// GetNextFailoverVersion return the next failover version based on input
117-
func (m Metadata) GetNextFailoverVersion(cluster string, currentFailoverVersion int64, domainName string) int64 {
118-
initialFailoverVersion := m.getInitialFailoverVersion(cluster, domainName)
116+
// GetNextFailoverVersion returns the next valid FailoverVersion for a domain
117+
func (m Metadata) GetNextFailoverVersion(targetClusterName string, currentFailoverVersion int64, domainName string) int64 {
118+
initialFailoverVersion := m.getInitialFailoverVersion(targetClusterName, domainName)
119119
failoverVersion := currentFailoverVersion/m.failoverVersionIncrement*m.failoverVersionIncrement + initialFailoverVersion
120120
if failoverVersion < currentFailoverVersion {
121121
return failoverVersion + m.failoverVersionIncrement

common/cluster/metadata_test.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ func TestMetadataBehaviour(t *testing.T) {
4242
const initialFailoverVersionC1 = 0
4343
const clusterName2 = "c2"
4444
const initialFailoverVersionC2 = 2
45+
const clusterName3 = "c3"
46+
const initialFailoverVersionC3 = 4
4547

4648
const failoverVersionIncrement = 100
4749

@@ -60,12 +62,17 @@ func TestMetadataBehaviour(t *testing.T) {
6062
currentVersion: 0,
6163
expectedOut: 2,
6264
},
63-
"a subsequent failover back": {
65+
"a failover to c3 should set the failover version to be based on c3": {
66+
failoverCluster: clusterName3,
67+
currentVersion: 2,
68+
expectedOut: 4,
69+
},
70+
"a subsequent failover back to c1 should increment the failover version by failoverVersionIncrement": {
6471
failoverCluster: clusterName1,
6572
currentVersion: 2,
6673
expectedOut: 100,
6774
},
68-
"and a duplicate": {
75+
"when the current failover version matches the target cluster it should not increment the failover version": {
6976
failoverCluster: clusterName1,
7077
currentVersion: 100,
7178
expectedOut: 100,
@@ -75,6 +82,11 @@ func TestMetadataBehaviour(t *testing.T) {
7582
currentVersion: 100,
7683
expectedOut: 102,
7784
},
85+
"and a subsequent fail back over to c1 should skip over c3": {
86+
failoverCluster: clusterName1,
87+
currentVersion: 102,
88+
expectedOut: 200,
89+
},
7890
}
7991

8092
for name, td := range tests {
@@ -88,10 +100,14 @@ func TestMetadataBehaviour(t *testing.T) {
88100
clusterName2: {
89101
InitialFailoverVersion: initialFailoverVersionC2,
90102
},
103+
clusterName3: {
104+
InitialFailoverVersion: initialFailoverVersionC3,
105+
},
91106
},
92107
versionToClusterName: map[int64]string{
93108
initialFailoverVersionC1: clusterName1,
94109
initialFailoverVersionC2: clusterName2,
110+
initialFailoverVersionC3: clusterName3,
95111
},
96112
useNewFailoverVersionOverride: func(domain string) bool { return false },
97113
metrics: metrics.NewNoopMetricsClient().Scope(0),

common/domain/attrValidator.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ func (d *AttrValidatorImpl) validateDomainReplicationConfigForGlobalDomain(
101101
clusters := replicationConfig.Clusters
102102
activeClusters := replicationConfig.ActiveClusters
103103

104+
if activeCluster == "" {
105+
return errActiveClusterNameRequired
106+
}
107+
104108
for _, clusterConfig := range clusters {
105109
if err := d.validateClusterName(clusterConfig.ClusterName); err != nil {
106110
return err
@@ -116,7 +120,16 @@ func (d *AttrValidatorImpl) validateDomainReplicationConfigForGlobalDomain(
116120
return false
117121
}
118122

123+
if err := d.validateClusterName(activeCluster); err != nil {
124+
return err
125+
}
126+
127+
if !isInClusters(activeCluster) {
128+
return errActiveClusterNotInClusters
129+
}
130+
119131
if replicationConfig.IsActiveActive() {
132+
// For active-active domains, also validate that all clusters in ActiveClustersByRegion are valid
120133
for _, cluster := range activeClusters.ActiveClustersByRegion {
121134
if err := d.validateClusterName(cluster.ActiveClusterName); err != nil {
122135
return err
@@ -126,14 +139,6 @@ func (d *AttrValidatorImpl) validateDomainReplicationConfigForGlobalDomain(
126139
return errActiveClusterNotInClusters
127140
}
128141
}
129-
} else {
130-
if err := d.validateClusterName(activeCluster); err != nil {
131-
return err
132-
}
133-
134-
if !isInClusters(activeCluster) {
135-
return errActiveClusterNotInClusters
136-
}
137142
}
138143

139144
return nil

common/domain/attrValidator_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,55 @@ func (s *attrValidatorSuite) TestValidateDomainReplicationConfigForGlobalDomain(
237237
},
238238
)
239239
s.NoError(err)
240+
241+
// When ActiveClusterName is not provided, and ActiveClusters are not provided, it should return an error
242+
err = s.validator.validateDomainReplicationConfigForGlobalDomain(
243+
&persistence.DomainReplicationConfig{
244+
ActiveClusterName: "",
245+
Clusters: []*persistence.ClusterReplicationConfig{
246+
{ClusterName: cluster.TestCurrentClusterName},
247+
},
248+
},
249+
)
250+
s.Error(err)
251+
s.IsType(&types.BadRequestError{}, err)
252+
253+
// When ActiveClusterName and ActiveClusters are provided, it should not return an error
254+
err = s.validator.validateDomainReplicationConfigForGlobalDomain(
255+
&persistence.DomainReplicationConfig{
256+
ActiveClusterName: cluster.TestCurrentClusterName,
257+
Clusters: []*persistence.ClusterReplicationConfig{
258+
{ClusterName: cluster.TestCurrentClusterName},
259+
{ClusterName: cluster.TestAlternativeClusterName},
260+
},
261+
ActiveClusters: &types.ActiveClusters{
262+
ActiveClustersByRegion: map[string]types.ActiveClusterInfo{
263+
cluster.TestRegion1: {ActiveClusterName: cluster.TestCurrentClusterName},
264+
cluster.TestRegion2: {ActiveClusterName: cluster.TestAlternativeClusterName},
265+
},
266+
},
267+
},
268+
)
269+
s.NoError(err)
270+
271+
// When ActiveClusterName is not provided, and ActiveClusters are provided, it should return an error
272+
err = s.validator.validateDomainReplicationConfigForGlobalDomain(
273+
&persistence.DomainReplicationConfig{
274+
ActiveClusterName: "",
275+
Clusters: []*persistence.ClusterReplicationConfig{
276+
{ClusterName: cluster.TestCurrentClusterName},
277+
{ClusterName: cluster.TestAlternativeClusterName},
278+
},
279+
ActiveClusters: &types.ActiveClusters{
280+
ActiveClustersByRegion: map[string]types.ActiveClusterInfo{
281+
cluster.TestRegion1: {ActiveClusterName: cluster.TestCurrentClusterName},
282+
cluster.TestRegion2: {ActiveClusterName: cluster.TestAlternativeClusterName},
283+
},
284+
},
285+
},
286+
)
287+
s.Error(err)
288+
s.IsType(&types.BadRequestError{}, err)
240289
}
241290

242291
func (s *attrValidatorSuite) TestValidateDomainReplicationConfigClustersDoesNotRemove() {

common/domain/errors.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ var (
3232
errGracefulFailoverInActiveCluster = &types.BadRequestError{Message: "Cannot start the graceful failover from an active cluster to an active cluster."}
3333
errOngoingGracefulFailover = &types.BadRequestError{Message: "Cannot start concurrent graceful failover."}
3434
errInvalidGracefulFailover = &types.BadRequestError{Message: "Cannot start graceful failover without updating active cluster or in local domain."}
35+
errActiveClusterNameRequired = &types.BadRequestError{Message: "ActiveClusterName is required for all global domains."}
3536

3637
errInvalidRetentionPeriod = &types.BadRequestError{Message: "A valid retention period is not set on request."}
3738
errInvalidArchivalConfig = &types.BadRequestError{Message: "Invalid to enable archival without specifying a uri."}

common/domain/handler.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -268,12 +268,6 @@ func (d *handlerImpl) RegisterDomain(
268268
return err
269269
}
270270

271-
if activeClusters != nil {
272-
// TODO: Leave a default activeClusterName for active-active domains
273-
// active-active domain, activeClusterName is not used
274-
activeClusterName = ""
275-
}
276-
277271
replicationConfig := &persistence.DomainReplicationConfig{
278272
ActiveClusterName: activeClusterName,
279273
Clusters: clusters,
@@ -299,8 +293,7 @@ func (d *handlerImpl) RegisterDomain(
299293
}
300294

301295
failoverVersion := constants.EmptyVersion
302-
if registerRequest.GetIsGlobalDomain() && !replicationConfig.IsActiveActive() {
303-
// assign failover version for active-passive domain
296+
if registerRequest.GetIsGlobalDomain() {
304297
failoverVersion = d.clusterMetadata.GetNextFailoverVersion(activeClusterName, 0, registerRequest.Name)
305298
}
306299

@@ -594,10 +587,12 @@ func (d *handlerImpl) UpdateDomain(
594587
// we increment failover version so top level failoverVersion is updated and domain data is replicated.
595588
failoverVersion = d.clusterMetadata.GetNextFailoverVersion(
596589
replicationConfig.ActiveClusterName,
590+
// TODO(active-active): This should be incremented in the same way as an active-passive domain
597591
failoverVersion+1,
598592
updateRequest.Name,
599593
)
600594

595+
// TODO(active-active): Increment all ClusterAttributes that have changed
601596
// we also use the new failover version belonging to currentActiveCluster for the corresponding ActiveClustersByRegion map entry
602597
for region, clusterInfo := range replicationConfig.ActiveClusters.ActiveClustersByRegion {
603598
if clusterInfo.ActiveClusterName == currentActiveCluster {
@@ -631,6 +626,7 @@ func (d *handlerImpl) UpdateDomain(
631626
// to indicate there was a change in replication config
632627
failoverVersion = d.clusterMetadata.GetNextFailoverVersion(
633628
d.clusterMetadata.GetCurrentClusterName(),
629+
// TODO(active-active): If the domain level ActiveCluster has changed this should be incremented in the same way as an active-passive domain
634630
failoverVersion+1,
635631
updateRequest.Name,
636632
)
@@ -645,7 +641,7 @@ func (d *handlerImpl) UpdateDomain(
645641
now,
646642
failoverType,
647643
&currentActiveCluster,
648-
nil,
644+
updateRequest.ActiveClusterName,
649645
currentActiveClusters,
650646
replicationConfig.ActiveClusters,
651647
))

common/domain/handler_test.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,31 @@ func TestRegisterDomain(t *testing.T) {
366366
expectedErr: &types.BadRequestError{},
367367
},
368368
{
369-
name: "active-active domain successfully registered",
369+
name: "active-active domain successfully registered with explicit ActiveClusterName",
370+
request: &types.RegisterDomainRequest{
371+
Name: "active-active-domain",
372+
IsGlobalDomain: true,
373+
ActiveClusterName: cluster.TestCurrentClusterName,
374+
ActiveClustersByRegion: map[string]string{
375+
cluster.TestRegion1: cluster.TestCurrentClusterName,
376+
cluster.TestRegion2: cluster.TestAlternativeClusterName,
377+
},
378+
Clusters: []*types.ClusterReplicationConfiguration{
379+
{ClusterName: cluster.TestCurrentClusterName},
380+
{ClusterName: cluster.TestAlternativeClusterName},
381+
},
382+
WorkflowExecutionRetentionPeriodInDays: 3,
383+
},
384+
isPrimaryCluster: true,
385+
mockSetup: func(mockDomainMgr *persistence.MockDomainManager, mockReplicator *MockReplicator, request *types.RegisterDomainRequest) {
386+
mockDomainMgr.EXPECT().GetDomain(gomock.Any(), &persistence.GetDomainRequest{Name: request.Name}).Return(nil, &types.EntityNotExistsError{})
387+
mockDomainMgr.EXPECT().CreateDomain(gomock.Any(), gomock.Any()).Return(&persistence.CreateDomainResponse{ID: "test-domain-id"}, nil)
388+
mockReplicator.EXPECT().HandleTransmissionTask(gomock.Any(), types.DomainOperationCreate, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), commonconstants.InitialPreviousFailoverVersion, true).Return(nil)
389+
},
390+
wantErr: false,
391+
},
392+
{
393+
name: "active-active domain successfully registered without explicit ActiveClusterName (uses current cluster)",
370394
request: &types.RegisterDomainRequest{
371395
Name: "active-active-domain",
372396
IsGlobalDomain: true,
@@ -1640,6 +1664,7 @@ func TestHandler_UpdateDomain(t *testing.T) {
16401664
setupMock: func(domainManager *persistence.MockDomainManager, updateRequest *types.UpdateDomainRequest, archivalMetadata *archiver.MockArchivalMetadata, timeSource clock.MockedTimeSource, domainReplicator *MockReplicator) {
16411665
domainResponse := &persistence.GetDomainResponse{
16421666
ReplicationConfig: &persistence.DomainReplicationConfig{
1667+
ActiveClusterName: cluster.TestCurrentClusterName,
16431668
Clusters: []*persistence.ClusterReplicationConfig{
16441669
{ClusterName: cluster.TestCurrentClusterName},
16451670
{ClusterName: cluster.TestAlternativeClusterName},
@@ -1692,6 +1717,7 @@ func TestHandler_UpdateDomain(t *testing.T) {
16921717
Info: domainResponse.Info,
16931718
Config: domainResponse.Config,
16941719
ReplicationConfig: &persistence.DomainReplicationConfig{
1720+
ActiveClusterName: cluster.TestCurrentClusterName,
16951721
Clusters: []*persistence.ClusterReplicationConfig{
16961722
{ClusterName: cluster.TestCurrentClusterName},
16971723
{ClusterName: cluster.TestAlternativeClusterName},
@@ -1747,6 +1773,7 @@ func TestHandler_UpdateDomain(t *testing.T) {
17471773
response: func(timeSource clock.MockedTimeSource) *types.UpdateDomainResponse {
17481774
data, _ := json.Marshal([]FailoverEvent{{
17491775
EventTime: timeSource.Now(),
1776+
FromCluster: cluster.TestCurrentClusterName,
17501777
FailoverType: commonconstants.FailoverType(commonconstants.FailoverTypeForce).String(),
17511778
FromActiveClusters: types.ActiveClusters{
17521779
ActiveClustersByRegion: map[string]types.ActiveClusterInfo{
@@ -1792,6 +1819,7 @@ func TestHandler_UpdateDomain(t *testing.T) {
17921819
AsyncWorkflowConfig: &types.AsyncWorkflowConfiguration{Enabled: true},
17931820
},
17941821
ReplicationConfiguration: &types.DomainReplicationConfiguration{
1822+
ActiveClusterName: cluster.TestCurrentClusterName,
17951823
Clusters: []*types.ClusterReplicationConfiguration{
17961824
{ClusterName: cluster.TestCurrentClusterName},
17971825
{ClusterName: cluster.TestAlternativeClusterName},

common/persistence/data_manager_interfaces.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1148,12 +1148,14 @@ type (
11481148
Clusters []*ClusterReplicationConfig
11491149

11501150
// ActiveClusterName is the name of the cluster that the domain is active in.
1151-
// Applicable for active-passive domains.
1151+
// Required for all global domains (both active-passive and active-active).
1152+
// For active-passive domains, this is the single active cluster.
1153+
// For active-active domains this is the default cluster whenever a ClusterAttribute is not provided
11521154
ActiveClusterName string
11531155

1154-
// TODO(c-warren): Update documentation once ActiveClusterName is the default for active-active domains.
11551156
// ActiveClusters is only applicable for active-active domains.
1156-
// If this is set, ActiveClusterName is ignored.
1157+
// When this is set, the domain is considered active-active and workflows are routed
1158+
// based on their ClusterAttributes.
11571159
ActiveClusters *types.ActiveClusters
11581160
}
11591161

0 commit comments

Comments
 (0)