Skip to content

Commit 69c3939

Browse files
authored
refactor(active-active): Add new methods to active cluster manager (#7308)
<!-- Describe what has changed in this PR --> **What changed?** Add 2 methods to active cluster manager to get active cluster info by cluster attribute <!-- Tell your future self why have you made these changes --> **Why?** Refactor active-active implementation <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** unit tests <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes**
1 parent 1270692 commit 69c3939

File tree

6 files changed

+977
-4
lines changed

6 files changed

+977
-4
lines changed

common/activecluster/manager.go

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,11 @@ import (
4242
type DomainIDToDomainFn func(id string) (*cache.DomainCacheEntry, error)
4343

4444
const (
45-
LookupNewWorkflowOpName = "LookupNewWorkflow"
46-
LookupWorkflowOpName = "LookupWorkflow"
47-
LookupClusterOpName = "LookupCluster"
48-
DomainIDToDomainFnErrorReason = "domain_id_to_name_fn_error"
45+
LookupNewWorkflowOpName = "LookupNewWorkflow"
46+
LookupWorkflowOpName = "LookupWorkflow"
47+
GetActiveClusterInfoByClusterAttributeOpName = "GetActiveClusterInfoByClusterAttribute"
48+
GetActiveClusterInfoByWorkflowOpName = "GetActiveClusterInfoByWorkflow"
49+
DomainIDToDomainFnErrorReason = "domain_id_to_name_fn_error"
4950

5051
workflowPolicyCacheTTL = 10 * time.Second
5152
workflowPolicyCacheMaxCount = 1000
@@ -321,3 +322,58 @@ func (m *managerImpl) handleError(scope metrics.Scope, err *error, start time.Ti
321322
}
322323
scope.RecordHistogramDuration(metrics.ActiveClusterManagerLookupLatency, time.Since(start))
323324
}
325+
326+
func (m *managerImpl) GetActiveClusterInfoByClusterAttribute(ctx context.Context, domainID string, clusterAttribute *types.ClusterAttribute) (res *types.ActiveClusterInfo, e error) {
327+
defer func() {
328+
logFn := m.logger.Debug
329+
if e != nil {
330+
logFn = m.logger.Warn
331+
}
332+
logFn("GetActiveClusterInfoByClusterAttribute",
333+
tag.WorkflowDomainID(domainID),
334+
tag.Dynamic("clusterAttribute", clusterAttribute),
335+
tag.Dynamic("result", res),
336+
tag.Error(e),
337+
)
338+
}()
339+
340+
d, scope, err := m.getDomainAndScope(domainID, GetActiveClusterInfoByClusterAttributeOpName)
341+
if err != nil {
342+
return nil, err
343+
}
344+
defer m.handleError(scope, &e, time.Now())
345+
346+
res, ok := d.GetActiveClusterInfoByClusterAttribute(clusterAttribute)
347+
if !ok {
348+
return nil, &ClusterAttributeNotFoundError{
349+
DomainID: domainID,
350+
ClusterAttribute: clusterAttribute,
351+
}
352+
}
353+
return res, nil
354+
}
355+
356+
func (m *managerImpl) GetActiveClusterInfoByWorkflow(ctx context.Context, domainID, wfID, rID string) (res *types.ActiveClusterInfo, e error) {
357+
d, scope, err := m.getDomainAndScope(domainID, GetActiveClusterInfoByWorkflowOpName)
358+
if err != nil {
359+
return nil, err
360+
}
361+
defer m.handleError(scope, &e, time.Now())
362+
363+
policy, err := m.getClusterSelectionPolicy(ctx, domainID, wfID, rID)
364+
if err != nil {
365+
var notExistsErr *types.EntityNotExistsError
366+
if !errors.As(err, &notExistsErr) {
367+
return nil, err
368+
}
369+
policy = &types.ActiveClusterSelectionPolicy{}
370+
}
371+
res, ok := d.GetActiveClusterInfoByClusterAttribute(policy.ClusterAttribute)
372+
if !ok {
373+
return nil, &ClusterAttributeNotFoundError{
374+
DomainID: domainID,
375+
ClusterAttribute: policy.ClusterAttribute,
376+
}
377+
}
378+
return res, nil
379+
}

common/activecluster/manager_mock.go

Lines changed: 30 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)